Una pequeña receta sobre el patrón ThreadPool y su implementación en Python.
ThreadPool es un patrón de diseño habitual en aplicaciones concurrentes y de comunicaciones. La idea es tener un «pool» de hilos (workers) que pueden realizar trabajos. Estos trabajos se le dan por medio de una cola FIFO. El usuario añade trabajos a la cola y cuando un hilo está libre extrae y procesa un trabajo de la cola.
Lo interesante es que el número de hilos se especifica cuando se crea el «pool» y no se crean ni destruyen hilos durante la ejecución de la aplicación. Con eso se consigue una aplicación más escalable, predecible y sin la sobrecarga que implica la creación y destrucción de recursos. A parte de las ventajas que implica el proceso en paralelo, este patrón permite tener un código más limpio y mejor estructurado para el cliente.
La implementación que proponemos es una modificación/mejora de http://code.activestate.com/recipes/203871/. Se trata de una clase con los siguientes métodos “públicos”:
El cliente simplemente tiene que añadir trabajos (funciones con argumentos) usando el método add(). Para terminar utiliza el méodo join() que obligará al pool a acabar impidiendo al mismo tiempo que se puedan añadir nuevos trabajos.
pool = ThreadPool(5) pool.add(f1, [arg1, arg2]) [. . .] pool.join()
A continuación aparece la implementación de la clase, aunque puedes descargar una versión actualizada de https://arco.esi.uclm.es/svn/public/prj/atheist/pyarco/Thread.py
class ThreadPool: class JoiningEx: pass """ Flexible thread pool class. Creates a pool of threads, then accepts tasks that will be dispatched to the next available thread """ def init(self, numThreads): """Initialize the thread pool with numThreads workers """ self.threads = [] self.__resizeLock = threading.Lock() self.__taskLock = threading.Condition(threading.Lock()) self.__tasks = [] self.__isJoining = False self.resize(numThreads) def resize(self, newsize): """ public method to set the current pool size """ if self.__isJoining: raise ThreadPool.JoiningEx() with self.__resizeLock: self.__resize(newsize) return True def __resize(self, newsize): """Set the current pool size, spawning or terminating threads if necessary. Internal use only; assumes the resizing lock is held.""" diff = newsize – len(self.__threads) # If we need to grow the pool, do so for i in range(diff): self.__threads.append(ThreadPool.Worker(self)) # If we need to shrink the pool, do so for i in range(-diff): thread = self.__threads.pop() thread.stop = True def __len(self): """Return the number of threads in the pool.""" with self.resizeLock: return len(self.__threads) def add(self, task, args=None, taskCallback=None): """Insert a task into the queue. task must be callable; args and taskCallback can be None.""" assert callable(task) if self.__isJoining: raise ThreadPool.JoiningEx() with self.__taskLock: self.__tasks.append((task, args, taskCallback)) self.__taskLock.notify() return True def nextTask(self): """ Retrieve the next task from the task queue. For use only by ThreadPoolWorker objects contained in the pool """ with self.__taskLock: while not self.__tasks: if self.__isJoining: raise ThreadPool.JoiningEx() self.__taskLock.wait() assert self.__tasks return self.__tasks.pop(0) def join(self, waitForTasks=True, waitForThreads=True): """ Clear the task queue and terminate all pooled threads, optionally allowing the tasks and threads to finish """ self.__isJoining = True # prevent more task queueing if waitForTasks: while self.__tasks: time.sleep(0.1) with self.__resizeLock: if waitForThreads: with self.__taskLock: self.__taskLock.notifyAll() for t in self.__threads: t.join() # ready to reuse del self.__threads[:] self.__isJoining = False class Worker(threading.Thread): """ Pooled thread class """ def __init(self, pool): """ Initialize the thread and remember the pool. """ threading.Thread.init(self) self.__pool = pool self.stop = False self.start() def run(self): """ Until told to quit, retrieve the next task and execute it, calling the callback if any. """ while not self.stop: try: cmd, args, callback = self.__pool.nextTask() except ThreadPool.JoiningEx: break logging.debug("thread %s taken %s" % (self, cmd)) result = cmd(*args) if callback: callback(result)
Próximamente la versión C++ del mismo invento.
Comments
ThreadPool en la librería estándar
La librería estándar de python-2.7 viene con una implementación de thread-pool en
multiprocessing.pool.ThreadPool. Lo interesante además es que tiene la misma interfaz quemultiprocessing.Poolde modo que se pueden hacer programas con pool de hilos o pool de procesos sin más que cambiar la clase a instanciar.No soy portavoz de ningún colectivo, grupo o facción. Mi opinión es personal e intransferible.
Error
A día de hoy, el método add de la clase ThreadPool no permite obviar el segundo parámetro (la lista de argumentos que pasar a la función) porque no acepta None (el valor por defecto).
Sería buena idea añadir un bug tacker para este tipo de librerías.
Aunque un poco tarde recojo
Aunque un poco tarde
recojo tu sugerencia. Esta clase está ahora en un módulo Python que se llama pyarco (aunque es probable que cambie de nombre próximamente), concretamente en el submódulo Thread. En el proyecto hay un issue tracker para informar de bugs:
https://arco.esi.uclm.es:3000/projects/pyarco
No soy portavoz de ningún colectivo, grupo o facción. Mi opinión es personal e intransferible.
Anybody can explain the below
Anybody can explain the below snippet of code extracted from above code.
def resize(self, newsize):
""" public method to set the current pool size """
if self.__isJoining:
raise ThreadPool.JoiningEx()
with self.__resizeLock:
self.__resize(newsize)
Is is possible to implement is in another alternate way.
Waiting for responses.
David Mayer
spam?
It is obvious that spambots are becoming more and more sophisticated every day.
Whoa, that sounded pretty much like the introduction to some paper's abstract.
Nacho
mmm...
....yo creo que sí... porque además los libros esos no parecen funcionar... o a lo mejor es esta conexion xurrera q tengo aquí...
------------------------------------------------------------
$ python -c "print 'VG9udG8gZWwgcXVlIGxvIGxlYSA6KQ==\n'.decode('base64')"
------------------------------------------------------------
ale, sin link ya no puede ser
ale, sin link ya no puede ser spam
No soy portavoz de ningún colectivo, grupo o facción. Mi opinión es personal e intransferible.