Sincronización y Compartición de Recursos entre Hilos
La sincronización es uno de los aspectos más críticos y desafiantes de la programación multihilo. Cuando múltiples hilos acceden y modifican recursos compartidos, es necesario coordinar su acceso para evitar inconsistencias y errores.
El problema de los recursos compartidos
Cuando múltiples hilos acceden simultáneamente a datos compartidos, pueden surgir problemas si no se implementa una sincronización adecuada. Veamos un ejemplo clásico:
var counter = 0
fun main() {
val threads = List(10) {
Thread {
repeat(1000) {
counter++ // ¡Operación NO atómica!
}
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
println("Valor esperado: 10000")
println("Valor real: $counter") // Probablemente menor que 10000
}
El código anterior producirá un valor menor que 10000 en la mayoría de las ejecuciones. Esto se debe a que la operación counter++ no es atómica y consiste en tres pasos:
- Leer el valor de
counter - Incrementar el valor
- Escribir el nuevo valor
Si dos hilos ejecutan estos pasos al mismo tiempo, pueden sobrescribir los cambios del otro.
Condiciones de carrera (Race Conditions)
Una condición de carrera ocurre cuando el resultado de un programa depende del orden de ejecución de los hilos, lo cual no está controlado. Esto genera comportamientos impredecibles y errores difíciles de reproducir.
Ejemplo de condición de carrera
class BankAccount(private var balance: Double = 0.0) {
fun deposit(amount: Double) {
val newBalance = balance + amount
Thread.sleep(1) // Simula procesamiento
balance = newBalance
}
fun getBalance() = balance
}
fun main() {
val account = BankAccount(1000.0)
val threads = List(5) {
Thread {
repeat(10) {
account.deposit(100.0)
}
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
println("Balance esperado: ${1000 + (5 * 10 * 100)}")
println("Balance real: ${account.getBalance()}")
}
Mecanismos de sincronización
Kotlin y la JVM proporcionan varios mecanismos para sincronizar el acceso a recursos compartidos:
1. Palabra clave synchronized
El bloque synchronized garantiza que solo un hilo pueda ejecutar el código dentro del bloque a la vez:
class SafeCounter {
private var count = 0
@Synchronized
fun increment() {
count++
}
@Synchronized
fun getCount() = count
}
fun main() {
val counter = SafeCounter()
val threads = List(10) {
Thread {
repeat(1000) {
counter.increment()
}
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
println("Valor final: ${counter.getCount()}") // Siempre será 10000
}
También podemos usar bloques synchronized en cualquier objeto:
class BankAccount(private var balance: Double = 0.0) {
private val lock = Any()
fun deposit(amount: Double) {
synchronized(lock) {
val newBalance = balance + amount
Thread.sleep(1)
balance = newBalance
}
}
fun withdraw(amount: Double): Boolean {
synchronized(lock) {
if (balance >= amount) {
balance -= amount
return true
}
return false
}
}
fun getBalance() = synchronized(lock) { balance }
}
Cada objeto en Java/Kotlin tiene un monitor implícito. Cuando un hilo entra en un bloque synchronized, adquiere el bloqueo del monitor y otros hilos deben esperar hasta que se libere.
2. Clases atómicas
Java proporciona clases atómicas en el paquete java.util.concurrent.atomic que garantizan operaciones thread-safe sin necesidad de sincronización explícita:
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
class AtomicCounter {
private val count = AtomicInteger(0)
fun increment() {
count.incrementAndGet()
}
fun getCount() = count.get()
}
fun main() {
val counter = AtomicCounter()
val threads = List(10) {
Thread {
repeat(1000) {
counter.increment()
}
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
println("Valor final: ${counter.getCount()}") // Siempre será 10000
}
Operaciones atómicas comunes
import java.util.concurrent.atomic.AtomicInteger
fun main() {
val atomic = AtomicInteger(0)
// Operaciones básicas
atomic.set(10)
val value = atomic.get()
// Incremento/Decremento
atomic.incrementAndGet() // ++i
atomic.getAndIncrement() // i++
atomic.decrementAndGet() // --i
atomic.getAndDecrement() // i--
// Operaciones complejas
atomic.addAndGet(5)
atomic.getAndAdd(5)
// Compare and Set (CAS)
val success = atomic.compareAndSet(10, 20) // Si es 10, lo cambia a 20
println("Valor final: ${atomic.get()}")
}
3. Locks explícitos (ReentrantLock)
Los locks explícitos ofrecen más flexibilidad que synchronized:
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
class BankAccountWithLock(private var balance: Double = 0.0) {
private val lock = ReentrantLock()
fun deposit(amount: Double) {
lock.withLock {
balance += amount
}
}
fun withdraw(amount: Double): Boolean {
lock.withLock {
if (balance >= amount) {
balance -= amount
return true
}
return false
}
}
fun getBalance() = lock.withLock { balance }
fun tryTransfer(to: BankAccountWithLock, amount: Double): Boolean {
// Intentar adquirir ambos locks
if (lock.tryLock()) {
try {
if (to.lock.tryLock()) {
try {
if (balance >= amount) {
balance -= amount
to.balance += amount
return true
}
} finally {
to.lock.unlock()
}
}
} finally {
lock.unlock()
}
}
return false
}
}
- Permite intentar adquirir el lock sin bloquearse (
tryLock()) - Soporta timeout en la adquisición del lock
- Puede ser interrumpido mientras espera el lock
- Soporta múltiples condiciones de espera
- Proporciona más información sobre el estado del lock
4. ReadWriteLock
Optimiza el acceso cuando hay muchas lecturas y pocas escrituras:
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.read
import kotlin.concurrent.write
class SharedData {
private var data = mutableListOf<String>()
private val lock = ReentrantReadWriteLock()
fun read(): List<String> {
return lock.read {
data.toList() // Múltiples hilos pueden leer simultáneamente
}
}
fun write(value: String) {
lock.write {
data.add(value) // Solo un hilo puede escribir
}
}
}
fun main() {
val sharedData = SharedData()
// Múltiples lectores
val readers = List(5) { id ->
Thread {
repeat(10) {
val data = sharedData.read()
println("[Lector $id] Leyó: ${data.size} elementos")
Thread.sleep(100)
}
}
}
// Pocos escritores
val writers = List(2) { id ->
Thread {
repeat(5) {
sharedData.write("Dato-$id-$it")
println("[Escritor $id] Escribió dato")
Thread.sleep(200)
}
}
}
(readers + writers).forEach { it.start() }
(readers + writers).forEach { it.join() }
}
Interbloqueos (Deadlocks)
Un deadlock o interbloqueo ocurre cuando dos o más hilos están esperando indefinidamente por recursos que están siendo retenidos por los otros hilos.
Ejemplo de deadlock
val lock1 = Any()
val lock2 = Any()
fun thread1() {
synchronized(lock1) {
println("Thread 1: Tiene lock1")
Thread.sleep(100)
println("Thread 1: Esperando lock2...")
synchronized(lock2) {
println("Thread 1: Tiene lock1 y lock2")
}
}
}
fun thread2() {
synchronized(lock2) {
println("Thread 2: Tiene lock2")
Thread.sleep(100)
println("Thread 2: Esperando lock1...")
synchronized(lock1) {
println("Thread 2: Tiene lock2 y lock1")
}
}
}
fun main() {
Thread { thread1() }.start()
Thread { thread2() }.start()
// ¡DEADLOCK! Los hilos se quedan esperando indefinidamente
}
Condiciones para un deadlock (Condiciones de Coffman)
Un deadlock requiere estas cuatro condiciones simultáneamente:
- Exclusión mutua: Al menos un recurso debe estar en modo no compartible
- Retención y espera: Un hilo retiene recursos mientras espera otros
- No apropiación: Los recursos no pueden ser quitados forzosamente
- Espera circular: Existe una cadena circular de hilos esperando recursos
Prevención de deadlocks
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
class Account(val id: Int, var balance: Double) {
val lock = ReentrantLock()
}
fun transfer(from: Account, to: Account, amount: Double) {
// Ordenar los locks por ID para evitar deadlock
val (first, second) = if (from.id < to.id) {
Pair(from, to)
} else {
Pair(to, from)
}
first.lock.withLock {
second.lock.withLock {
if (from.balance >= amount) {
from.balance -= amount
to.balance += amount
println("Transferencia exitosa: $amount de ${from.id} a ${to.id}")
}
}
}
}
fun main() {
val account1 = Account(1, 1000.0)
val account2 = Account(2, 1000.0)
// Estos hilos no causarán deadlock gracias al ordenamiento
val t1 = Thread { transfer(account1, account2, 100.0) }
val t2 = Thread { transfer(account2, account1, 50.0) }
t1.start()
t2.start()
t1.join()
t2.join()
println("Saldo cuenta 1: ${account1.balance}")
println("Saldo cuenta 2: ${account2.balance}")
}
Colecciones concurrentes
Java proporciona colecciones thread-safe especializadas:
import java.util.concurrent.*
fun main() {
// Lista concurrente
val list = CopyOnWriteArrayList<String>()
// Mapa concurrente
val map = ConcurrentHashMap<String, Int>()
// Cola concurrente
val queue = ConcurrentLinkedQueue<String>()
// Cola bloqueante
val blockingQueue = LinkedBlockingQueue<String>(10)
// Ejemplo con ConcurrentHashMap
val threads = List(10) { id ->
Thread {
repeat(100) {
map.compute("counter") { _, current ->
(current ?: 0) + 1
}
}
}
}
threads.forEach { it.start() }
threads.forEach { it.join() }
println("Valor final: ${map["counter"]}") // Siempre será 1000
}
BlockingQueue para productor-consumidor
import java.util.concurrent.ArrayBlockingQueue
import kotlin.concurrent.thread
fun main() {
val queue = ArrayBlockingQueue<Int>(10)
// Productor
val producer = thread(name = "Productor") {
repeat(20) { i ->
queue.put(i) // Bloquea si la cola está llena
println("[Productor] Producido: $i")
Thread.sleep(100)
}
queue.put(-1) // Señal de finalización
}
// Consumidor
val consumer = thread(name = "Consumidor") {
while (true) {
val item = queue.take() // Bloquea si la cola está vacía
if (item == -1) break
println("[Consumidor] Consumido: $item")
Thread.sleep(150)
}
}
producer.join()
consumer.join()
println("Finalizado")
}
Volatile y visibilidad de memoria
La palabra clave @Volatile garantiza que los cambios en una variable sean visibles inmediatamente para todos los hilos:
class SharedFlag {
@Volatile
var running = true
fun stop() {
running = false
}
}
fun main() {
val flag = SharedFlag()
val worker = Thread {
var count = 0
while (flag.running) {
count++
}
println("Hilo finalizado después de $count iteraciones")
}
worker.start()
Thread.sleep(1000)
println("Deteniendo hilo...")
flag.stop()
worker.join()
}
@Volatile solo garantiza visibilidad, no atomicidad. Para operaciones como incremento, usa clases atómicas o sincronización:
@Volatile var counter = 0
// counter++ NO es thread-safe, incluso con @Volatile
val atomicCounter = AtomicInteger(0)
// atomicCounter.incrementAndGet() SÍ es thread-safe
Semáforos
Los semáforos limitan el número de hilos que pueden acceder a un recurso:
import java.util.concurrent.Semaphore
import kotlin.concurrent.thread
fun main() {
val semaphore = Semaphore(3) // Máximo 3 hilos simultáneos
val threads = List(10) { id ->
thread {
println("[$id] Esperando permiso...")
semaphore.acquire() // Obtener permiso
try {
println("[$id] Tiene permiso, trabajando...")
Thread.sleep(2000)
println("[$id] Trabajo completado")
} finally {
semaphore.release() // Liberar permiso
}
}
}
threads.forEach { it.join() }
}
CountDownLatch y CyclicBarrier
CountDownLatch
Permite que uno o más hilos esperen hasta que se completen un conjunto de operaciones:
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
fun main() {
val latch = CountDownLatch(3)
repeat(3) { id ->
thread {
println("[$id] Iniciando tarea...")
Thread.sleep((1000..3000).random().toLong())
println("[$id] Tarea completada")
latch.countDown()
}
}
println("Esperando a que se completen las tareas...")
latch.await()
println("¡Todas las tareas completadas!")
}
CyclicBarrier
Permite que un conjunto de hilos esperen entre sí hasta que todos alcancen un punto común:
import java.util.concurrent.CyclicBarrier
import kotlin.concurrent.thread
fun main() {
val barrier = CyclicBarrier(3) {
println("¡Todos los hilos han llegado a la barrera!")
}
repeat(3) { id ->
thread {
println("[$id] Fase 1")
Thread.sleep(1000)
barrier.await()
println("[$id] Fase 2")
Thread.sleep(1000)
barrier.await()
println("[$id] Finalizado")
}
}
}
Ejemplo práctico: Sistema productor-consumidor completo
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
data class Task(val id: Int, val data: String)
class ProducerConsumerSystem {
private val queue = ArrayBlockingQueue<Task>(10)
private val producedCount = AtomicInteger(0)
private val consumedCount = AtomicInteger(0)
@Volatile private var running = true
fun produce(producerId: Int) {
thread(name = "Productor-$producerId") {
while (running) {
val taskId = producedCount.incrementAndGet()
val task = Task(taskId, "Datos del productor $producerId")
queue.put(task)
println("[P-$producerId] Produjo tarea $taskId (cola: ${queue.size})")
Thread.sleep((100..500).random().toLong())
}
}
}
fun consume(consumerId: Int) {
thread(name = "Consumidor-$consumerId") {
while (running) {
try {
val task = queue.poll(1, java.util.concurrent.TimeUnit.SECONDS)
if (task != null) {
consumedCount.incrementAndGet()
println("[C-$consumerId] Consumió tarea ${task.id}")
Thread.sleep((200..600).random().toLong())
}
} catch (e: InterruptedException) {
break
}
}
}
}
fun stop() {
running = false
}
fun getStats() = "Producidos: ${producedCount.get()}, Consumidos: ${consumedCount.get()}"
}
fun main() {
val system = ProducerConsumerSystem()
// Crear 2 productores y 3 consumidores
repeat(2) { system.produce(it) }
repeat(3) { system.consume(it) }
// Dejar ejecutar durante 10 segundos
Thread.sleep(10000)
system.stop()
Thread.sleep(2000) // Dar tiempo para finalizar
println("Estadísticas finales: ${system.getStats()}")
}
Conclusión
La sincronización es esencial para la programación multihilo correcta. Los puntos clave son:
- Identificar recursos compartidos
- Elegir el mecanismo de sincronización apropiado
- Evitar deadlocks mediante ordenamiento de locks
- Usar colecciones concurrentes cuando sea posible
- Minimizar la sección crítica
- Considerar usar abstracciones de más alto nivel (ExecutorService, corrutinas)
En la siguiente sección veremos cómo depurar y documentar aplicaciones multihilo efectivamente.