threading.Condition()
en Python es una herramienta de sincronización avanzada que se utiliza cuando es necesario coordinar la ejecución entre varios hilos. Su uso principal es gestionar situaciones en las que un hilo debe esperar a que ocurra una condición específica (un cambio de estado o una notificación) antes de continuar su ejecución.
threading.Condition()
?Condition
actúa como una especie de «puerta» que los hilos deben esperar que se abra. Esta puerta solo se abre cuando se cumple una condición específica, y para indicar este estado, se utilizan los métodos:
wait()
: Un hilo que llama a wait()
se detiene hasta que otro hilo le «notifica» que puede continuar.notify()
: Un hilo que llama a notify()
«despierta» a uno de los hilos que estén esperando en wait()
.notify_all()
: Similar a notify()
, pero en lugar de despertar un solo hilo, despierta a todos los hilos que estén en espera.Este mecanismo es útil en situaciones como:
threading.Condition
A continuación, se muestra un ejemplo de un productor y un consumidor que comparten un recurso (una lista de «tareas») y usan Condition
para coordinar la ejecución.
import threading
import time
import random
# Lista compartida entre productor y consumidor
tareas = []
# Creamos un objeto Condition
condicion = threading.Condition()
# Función del productor
def productor():
while True:
# Simulamos la creación de una tarea
tarea = random.randint(1, 100)
with condicion:
tareas.append(tarea)
print(f"Productor ha creado la tarea: {tarea}")
# Notificamos al consumidor que hay una nueva tarea
condicion.notify()
# Esperamos un poco antes de crear otra tarea
time.sleep(random.uniform(0.5, 1.5))
# Función del consumidor
def consumidor():
while True:
with condicion:
# Esperamos hasta que haya al menos una tarea en la lista
while not tareas:
print("Consumidor esperando tareas...")
condicion.wait()
# Consumimos la tarea
tarea = tareas.pop(0)
print(f"Consumidor ha procesado la tarea: {tarea}")
# Simulamos el tiempo que toma procesar la tarea
time.sleep(random.uniform(1, 2))
# Creación de hilos productor y consumidor
hilo_productor = threading.Thread(target=productor)
hilo_consumidor = threading.Thread(target=consumidor)
# Iniciamos los hilos
hilo_productor.start()
hilo_consumidor.start()
# Esperamos que ambos hilos terminen (en este caso no terminarán ya que están en un bucle infinito)
hilo_productor.join()
hilo_consumidor.join()
tareas
actúa como una cola compartida entre el productor y el consumidor.Condition
: condicion
es el objeto Condition
que se usa para coordinar la sincronización entre el productor y el consumidor.productor
:
Condition
con with condicion
, añade la tarea a la lista y luego usa condicion.notify()
para informar al consumidor de que hay una tarea disponible.consumidor
:
Condition
y verifica si hay tareas en la lista.condicion.wait()
, esperando que el productor le notifique con una nueva tarea.Condition
Este patrón de sincronización evita que el consumidor intente procesar una tarea cuando no hay ninguna disponible, y que el productor tenga que esperar que el consumidor esté listo para procesar. El Condition
maneja la coordinación y garantiza que el flujo entre productor y consumidor esté sincronizado.
explicación anteriror
En ocasiones no es posible garantizar una región de código con lock, ya que se pueden producir interbloqueos como podemos ver en el gráfico siguiente.
import threading
class Recurso:
def __init__(self):
self.lock = threading.Lock()
def acceder(self):
self.lock.acquire()
print("Recurso bloqueado.")
recurso = Recurso()
def hilo(recurso):
print("Intentando acceder al recurso...")
recurso.acceder()
# Bloqueamos el recurso manualmente antes de iniciar los hilos
recurso.lock.acquire()
hilos = []
for i in range(2):
t = threading.Thread(target=hilo, args=(recurso,))
hilos.append(t)
t.start()
for t in hilos:
t.join()
En este código, creamos una clase Recurso que tiene un Lock. Cuando un hilo intenta acceder al recurso, intenta adquirir el Lock. Sin embargo, antes de iniciar los hilos, adquirimos el Lock manualmente y nunca lo liberamos. Como resultado, cuando los hilos intentan adquirir el Lock, se bloquean porque el Lock ya ha sido adquirido y no se libera. Esto causa un bloqueo en el programa. Por favor, ten en cuenta que este código es solo para fines de demostración y no se recomienda bloquear un programa de esta manera en una situación real.
El objeto Condition tiene que estar inscrito en una zona de bloqueo lock. Veamos los principales métodos:
import threading
def ping(cond):
for _ in range(20):
with cond:
cond.wait()
print ("ping")
cond.notify()
def pong(cond):
for _ in range(20):
with cond:
cond.wait()
print ("pong")
cond.notify()
cond = threading.Condition()
t1= threading.Thread(target=ping, args=(cond,))
t2=threading.Thread(target=pong, args=(cond,))
t1.start()
t2.start()
with cond:
cond.notify()
Los dos hilos quedan en bloqueo wait(), hasta que el hilo principal envía la notificación de despertar (notify). En este momento, sólo uno de los hilos ejecutará el código (bloqueado mediante un lock implícito) que imprima su mensaje y envíe una señal que despierte al otro hilo. Esto se hace de forma alternativa hasta consumir todas las iteraciones. De esta forma podemos ver que, una vez lanzada la primera señal , los hilos se intercomunican entre sí.
Ejemplo condition lista:
import threading
# Creamos una condición
condicion = threading.Condition()
# Creamos una lista vacía
lista = []
# Creamos una función que agrega elementos a la lista
def agregar_elemento():
# Bloqueamos la condición
with condicion:
# Agregamos un elemento a la lista
lista.append(1)
# Notificamos a los hilos que están esperando
condicion.notify()
# Creamos una función que elimina elementos de la lista
def eliminar_elemento():
# Bloqueamos la condición
with condicion:
# Esperamos a que haya elementos en la lista
while not lista:
condicion.wait()
# Eliminamos un elemento de la lista
lista.pop()
# Creamos dos hilos
hilo1 = threading.Thread(target=agregar_elemento)
hilo2 = threading.Thread(target=eliminar_elemento)
# Iniciamos los hilos
hilo1.start()
hilo2.start()
# Esperamos a que los hilos terminen
hilo1.join()
hilo2.join()
Ejemplo:
import threading
import time
# Creamos un objeto Condition
condicion = threading.Condition()
# Creamos una lista para almacenar los hilos
hilos = []
# Definimos una función que simula una sección crítica
def seccion_critica():
# Adquirimos el candado
condicion.acquire
# Realizamos la tarea dentro de la sección crítica
print(f"Hilo {threading.current_thread().name} está dentro de la sección crítica")
time.sleep(0.3)
# Liberamos el candado
condicion.release
# Creamos 5 hilos
for i in range(5):
hilo = threading.Thread(target=seccion_critica, name=f"Hilo {i+1}")
hilos.append(hilo)
# Iniciamos los hilos
for hilo in hilos:
hilo.start()
# Esperamos a que los hilos terminen
for hilo in hilos:
hilo.join()
En este ejemplo, creamos un objeto Condition y lo usamos para controlar el acceso a una sección crítica por múltiples hilos. La sección crítica es simulada por la función seccion_critica(), que adquiere el candado usando with condicion, espera a que la sección crítica esté disponible usando condicion.wait(), realiza la tarea dentro de la sección crítica y luego libera el candado usando condicion.release(). Los hilos esperan a que la sección crítica esté disponible usando condicion.wait() y luego realizan la tarea dentro de la sección crítica. De esta manera, podemos asegurarnos de que solo un hilo acceda a la sección crítica a la vez.
Una condición variable siempre va asociada a algún tipo de lock. éste puede ser provisto o se creará uno por defecto. Proveer uno es útil cuando varias variables de condición deben compartir el mismo lock. El lock es parte del objeto condicional: no es necesario rastrearlo por separado.
Una condición variable obedece el protocolo de gestión de contexto: al usar la declaración with se adquiere el lock asociado por la duración del bloque contenido. Los métodos acquire() y release() también llaman los métodos correspondientes del lock asociado.
Otros métodos deben llamarse con el lock asociado conservado. El método wait() libera el lock, y luego bloquea hasta que otro hilo lo despierte llamando notify() o notify_all(). Una vez que ha sido despertado, wait() re-adquiere el lock y retorna. También es posible especificar un tiempo de espera.
El método notify() despierta a uno de los hilos que esperan a la condición variable, si es que alguno espera. El método notify_all() despierta a todos los hilos que estén esperando a la condición variable.
Nota: Los métodos notify() y notify_all() no liberan el lock; esto significa que el hilo o los hilos que han sido despertados no retornaran de su llamada de wait() inmediatamente, sino solo una vez que el hilo que haya llamado a notify() o notify_all() renuncie finalmente a la propiedad del lock.
El estilo típico de programación con variables condicionales utiliza el lock para sincronizar el acceso a algún estado compartido; hilos que estén interesados en un cambio de estado en particular llamarán a wait() reiteradamente hasta que vean el estado deseado, mientras que los hilos que modifiquen el estado llamarán a notify() o a notify_all() cuando cambien el estado de modo que pudiera ser que el el estado sea el deseado por alguno de los hilos en espera. Por ejemplo, el siguiente código es una situación genérica de productor-consumidor con capacidad de búfer ilimitada:
# Consume one item
with cv:
while not an_item_is_available():
cv.wait()
get_an_available_item()
# Produce one item
with cv:
make_an_item_available()
cv.notify()
El bucle while que verifica la condición de la aplicación es necesario porque wait() puede retornar después de una cantidad arbitraria de tiempo, y la condición que dio pie a la llamada de notify() puede ya no ser verdadera. Esto es inherente a la programación multi-hilo. El método wait_for() puede usarse para automatizar la revisión de condiciones, y facilita la computación de tiempos de espera:
# Consume an item
with cv:
cv.wait_for(an_item_is_available)
get_an_available_item()
Para elegir entre notify() y notify_all(), considérese si un cambio de estado puede ser interesante para uno o varios hilos en espera. Por ejemplo en una típica situación productor-consumidor, agregar un elemento al búfer sólo necesita despertar un hilo consumidor.
class threading.Condition(lock=None)
Esta clase implementa objetos de condición variable. Una condición variable permite que uno o más hilos esperen hasta que sean notificados por otro hilo.
Si se provee un argumento lock distinto de None, debe ser un objeto Lock o RLock, y se utiliza como el lock subyacente. De otro modo, se crea un nuevo objeto RLock y se utiliza como el lock subyacente.
Distinto en la versión 3.3: cambiado de función de fábrica a una clase.
acquire(*args)
Adquiere el lock subyacente. Este método llama al método correspondiente sobre el lock subyacente; el valor de retorno es lo que retorne aquel método.
release()
Libera el lock subyacente. Este método llama al método correspondiente en el lock subyacente; no tiene valor de retorno.
wait(timeout=None)
Espera hasta ser notificado o hasta que el tiempo de espera caduque. Si el hilo invocador no ha adquirido el lock cuando este método es llamado, se lanza un RuntimeError.
Este método libera el lock subyacente, y luego bloquea hasta ser despertado por una llamada a notify() o notify_all() para la misma condición variable en otro hilo, o hasta que el tiempo de espera opcional se cumpla. Una vez que ha sido despertado o el tiempo de espera ha pasado, re-adquiere el lock y retorna.
Cuando haya un argumento timeout presente y no sea None, debe ser un número de punto flotante que especifique un tiempo de espera para la operación en segundos (o fracciones de segundo).
Cuando el lock subyacente es un RLock, no se libera utilizando su método release(), ya que esto podría no abrir realmente el lock cuando haya sido adquirido múltiples veces recursivamente. En cambio, se usa una interfaz interna de la clase RLock, que lo abre realmente incluso cuando haya sido adquirido múltiples veces recursivamente. Otra interfaz interna se usa luego para restablecer el nivel de recursividad cuando el lock es readquirido.
El valor de retorno es True a menos que un timeout dado haya expirado, en cuyo caso será False.
Distinto en la versión 3.2: Previamente, el método siempre retornaba None.
wait_for(predicate, timeout=None)
Espera a que una condición se evalúe como verdadera. predicate debe ser un invocable cuyo resultado se interpretará como un valor booleano. Se puede proveer un timeout que especifique el máximo tiempo de espera.
Este método utilitario puede llamar a wait() reiteradas veces hasta que se satisfaga el predicado, o hasta que la espera caduque. El valor de retorno es el último valor de retorno del predicado y se evaluará a False si el método ha caducado.
Al ignorar la propiedad feature, llamar a este método equivale vagamente a escribir:
while not predicate():
cv.wait()
Por ende, aplican las mismas reglas que con wait(): El lock debe ser conservado cuando se llame y es re-adquirido al momento del retorno. El predicado se evalúa con el lock conservado.
Nuevo en la versión 3.2.
notify(n=1)
Por defecto, despierta a un hilo que esté esperando por esta condición, si lo existe. Si el hilo llamador no ha adquirido el lock cuando se llama este método, se lanza un RuntimeError.
Este método despierta como máximo n de los hilos que estén esperando por la condición variable; no es una opción si no hay hilos esperando.
La implementación actual despierta exactamente n hilos, si hay por lo menos n hilos esperando. Sin embargo, no es seguro apoyarse en este comportamiento. A futuro, una implementación optimizada podría ocasionalmente despertar a más de n hilos.
Nota: un hilo que ha sido despertado no retorna realmente de su llamada a wait() hasta que pueda readquirir el lock. Ya que notify() no libera el lock, su llamador debiera hacerlo.
notify_all()
Despierta a todos los hilos que esperen por esta condición. Este método actúa como notify(), pero despierta a todos los hilos en espera en vez de a uno. Si el hilo llamador no ha adquirido el lock cuando se llama a este método, se lanza un RuntimeError.
El método notifyAll es un alias obsoleto para este método.