Un semáforo es un tipo de objeto de sincronización que se utiliza para proteger secciones de código que no pueden ser ejecutadas simultáneamente por más de un hilo. En Python, los semáforos se proporcionan a través de la clase Semaphore en el módulo threading.
Un semáforo mantiene un contador interno que se decrementa cada vez que un hilo adquiere el semáforo y se incrementa cuando un hilo libera el semáforo. Cuando el contador llega a cero, los hilos subsiguientes que intenten adquirir el semáforo se bloquearán hasta que otro hilo libere el semáforo.
Los semáforos funcionan a partir del número máximo de hilos, son como cerradoras con contador:
Nota: BoundedSemaphore es un tipo especial de semáforo. Un Semaphore puede ser liberado más veces de las que es adquirido, lo que aumentará su contador por encima del valor inicial. Un BoundedSemaphore no puede ser aumentado por encima del valor inicial.
Este ejemplo simula una lista en la que sólo coexiste un máximo de 3 elementos. Creamos 10 hilos, cada uno de los cuales intenta insertar y borrar elementos de una lista, con una pausa de 1/2 segundo, para permitir cambios de contexto intercalados.
Como sólo tres hilos pueden estar al mismo tiempo en la región crítica, la lista nunca tendrá más de tres elementos ya que pondremos un semáforo que lo garantice.
Las operaciones de insercción y eliminación en la lista se han protegido en el mismo lock. Las operaciones de acquire y release se efectúan de forma implícita dentro del bloque with
.
import threading
import time
import logging
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-9s) %(message)s',)
class ThreadPool(object):
def __init__(self):
super(ThreadPool, self).__init__()
self.active = []
self.lock = threading.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
logging.debug('Running: %s', self.active)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
logging.debug('Running: %s', self.active)
def f(s, pool):
logging.debug('Esperando para unirse al grupo')
with s:
name = threading.current_thread().name
pool.makeActive(name)
time.sleep(0.5)
pool.makeInactive(name)
pool = ThreadPool()
s = threading.Semaphore(3)
for i in range(10):
t = threading.Thread(target=f, name='thread_'+str(i), args=(s, pool))
t.start()
(thread_0 ) Esperando para unirse al grupo (thread_0 ) Running: ['thread_0'] (thread_1 ) Esperando para unirse al grupo (thread_2 ) Esperando para unirse al grupo (thread_1 ) Running: ['thread_0', 'thread_1'] (thread_3 ) Esperando para unirse al grupo (thread_4 ) Esperando para unirse al grupo (thread_2 ) Running: ['thread_0', 'thread_1', 'thread_2'] (thread_5 ) Esperando para unirse al grupo (thread_6 ) Esperando para unirse al grupo (thread_7 ) Esperando para unirse al grupo (thread_8 ) Esperando para unirse al grupo (thread_9 ) Esperando para unirse al grupo (thread_0 ) Running: ['thread_1', 'thread_2'] (thread_3 ) Running: ['thread_1', 'thread_2', 'thread_3'] (thread_1 ) Running: ['thread_2', 'thread_3'] (thread_4 ) Running: ['thread_2', 'thread_3', 'thread_4'] (thread_2 ) Running: ['thread_3', 'thread_4'] (thread_5 ) Running: ['thread_3', 'thread_4', 'thread_5'] (thread_3 ) Running: ['thread_4', 'thread_5'] (thread_6 ) Running: ['thread_4', 'thread_5', 'thread_6'] (thread_4 ) Running: ['thread_5', 'thread_6'] (thread_7 ) Running: ['thread_5', 'thread_6', 'thread_7'] (thread_5 ) Running: ['thread_6', 'thread_7'] (thread_8 ) Running: ['thread_6', 'thread_7', 'thread_8'] (thread_6 ) Running: ['thread_7', 'thread_8'] (thread_9 ) Running: ['thread_7', 'thread_8', 'thread_9'] (thread_8 ) Running: ['thread_7', 'thread_9'] (thread_7 ) Running: ['thread_9'] (thread_9 ) Running: []
Otro ejemplo:
import threading
import time
# Crea un semáforo que permite a dos hilos entrar a la sección crítica
sem = threading.Semaphore(2)
def seccion_critica():
print(f"{threading.current_thread().name} entrando...")
time.sleep(1)
print(f"{threading.current_thread().name} saliendo...")
def hilo(sem):
print(f"{threading.current_thread().name} esperando...")
with sem:
seccion_critica()
hilos = []
for i in range(4):
t = threading.Thread(target=hilo, args=(sem,))
hilos.append(t)
t.start()
for t in hilos:
t.join()
Thread-1 (hilo) esperando... Thread-1 (hilo) entrando... Thread-2 (hilo) esperando... Thread-2 (hilo) entrando... Thread-3 (hilo) esperando... Thread-4 (hilo) esperando... Thread-1 (hilo) saliendo... Thread-3 (hilo) entrando... Thread-2 (hilo) saliendo... Thread-4 (hilo) entrando... Thread-3 (hilo) saliendo... Thread-4 (hilo) saliendo...
En este código, creamos un semáforo que permite a dos hilos entrar a la sección crítica al mismo tiempo. Cada hilo intenta adquirir el semáforo antes de entrar a la sección crítica. Si el semáforo ha sido adquirido por dos hilos, los hilos subsiguientes tendrán que esperar hasta que uno de los hilos en la sección crítica libere el semáforo. Esto asegura que no más de dos hilos pueden estar en la sección crítica al mismo tiempo.
Éste es uno de las primitivas de sincronización más antiguos en la historia de las ciencias de la computación, inventado por el pionero en ciencias de la computación holandés Edsger W. Dijkstra (él utilizó los nombres P() y V() en lugar de acquire() y release())
Un semáforo administra un contador interno que se disminuye por cada llamada a acquire() y se incrementa por cada llamada a release(). El contador no puede bajar de cero; cuando acquire() lo encuentra en cero, bloquea, esperando hasta que otro hilo llame release().
Los semáforos también tienen soporte para el protocolo de gestión de contexto.
class threading.Semaphore(value=1)
Esta clase implementa los objetos semáforo. Un semáforo gestiona un contador atómico que representa el número de llamadas a release() menos el número de llamadas a acquire(), más un valor inicial. El método acquire() bloquea si es necesario, hasta que pueda retornar sin volver el contador negativo. Si no es provisto, el valor por defecto de value será 1.
El argumento opcional da el value inicial al contador interno; por defecto es 1. Si el value provisto es menor a 0; se lanza un ValueError.
Distinto en la versión 3.3: cambiado de función de fábrica a una clase.
acquire(blocking=True, timeout=None)
Adquirir un semáforo.
Cuando se invoca sin argumentos:
Si el contador interno es mayor a cero de entrada, lo disminuye en uno y retorna True inmediatamente.
Si el contador interno es cero de entrada, bloquea hasta ser despertado por una llamada a release(). Una vez despierto (y el contador sea mayor a 0), disminuye el contador en 1 y retorna True. Se despertará exactamente un hilo por cada llamada a release(). No debiese confiarse en el orden en que los hilos sean despertados.
When invoked with blocking set to False, do not block. If a call without an argument would block, return False immediately; otherwise, do the same thing as when called without arguments, and return True.
Cuando se invoca con timeout distinto de None, bloqueará por un tiempo máximo en segundos fijados en timeout. Si acquire no se completa exitosamente en ese intervalo, retorna False. De otro modo retorna True.
Distinto en la versión 3.2: El parámetro timeout es nuevo.
release(n=1)
Suelta un semáforo, incrementando el contador interno por n. Cuando era cero en la entrada y otros subprocesos están esperando que vuelva a ser mayor que cero, active n de esos subprocesos.
Distinto en la versión 3.9: Se agregó el parámetro n para liberar varios subprocesos en espera a la vez.
class threading.BoundedSemaphore(value=1)
Clase que implementa objetos de semáforo delimitados. Un semáforo delimitado verifica que su valor actual no exceda su valor inicial. Si lo hace, se lanza un ValueError. En la mayoría de las situaciones se utilizan los semáforos para cuidar recursos con capacidad limitada. Si se libera el semáforo demasiadas veces es signo de un bug. Si no se provee, el valor por defecto de value será 1.
Distinto en la versión 3.3: cambiado de función de fábrica a una clase.
Ejemplo de Semaphore
Los semáforos suelen utilizarse para cuidar recursos con capacidad limitada, por ejemplo, un servidor de base de datos. En cualquier situación en que el tamaño de los recursos sea fijo, se debe usar un semáforo delimitado. Antes de generar cualquier hilo de trabajo, tu hilo principal debe inicializar el semáforo:
maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)
Una vez que han sido generados, los hilos de trabajo llaman a los métodos acquire y release cuando necesitan conectarse al servidor:
with pool_sema:
conn = connectdb()
try:
# ... use connection ...
finally:
conn.close()
El uso de semáforos delimitados reduce la posibilidad de que pase inadvertido un error de programación que cause que el semáforo sea liberado más veces de las que sea adquirido.