Below is the file 'goatpy/gen2consume.py' from this revision. You can also download the file.
#!/usr/bin/python # # a rewrite of scan.py to use better threading ideas. # # we have N threads that run plugins. # # we have a main thread that updates a condition (for queue) # and threads ask for data when they want it # # thanks to Ian McKellar for spotting the empty list / None # bug. import threading import time class Queue: def __init__(self): self.items = [] self.is_dead = False def is_empty(self): return self.items == [] def append(self, new): self.items.append(new) def pop(self): if self.is_empty(): return None rv = self.items[0] self.items = self.items[1:] return rv class RunGenerator(threading.Thread): def __init__(self, cond, queue, generator): threading.Thread.__init__(self) self.cond = cond self.queue = queue self.generator = generator self.setDaemon(True) def run(self): for i in self.generator: self.cond.acquire() self.queue.append(i) self.cond.notify() self.cond.release() self.queue.is_dead = True # ok, send each thread a spurious notify. they'll # each notice that there is nothing in the queue and exit self.cond.acquire() self.cond.notifyAll() self.cond.release() # wait for the queue to become empty while not self.queue.is_empty(): time.sleep(1) class RunFunction(threading.Thread): def __init__(self, cond, queue, function, id): threading.Thread.__init__(self, name="RunFunction id %s" % (id)) self.cond = cond self.queue = queue self.function = function self.id = id self.setDaemon(True) self.status_string = "" def run(self): # if we wake up after calling wait() while 1: self.cond.acquire() while self.queue.is_empty() and not self.queue.is_dead: self.cond.wait() # if we woke up and nothing is in the queue, we have been # requested to exit if self.queue.is_empty(): self.cond.release() return data = self.queue.pop() self.cond.release() self.function(self, data) runner_threads = 10 def gen2consume(generator, max_threads, thread_function): cond = threading.Condition() queue = Queue() generator_thread = RunGenerator(cond, queue, generator) generator_thread.start() runners = [RunFunction(cond, queue, thread_function, t) for t in range(max_threads)] map(lambda x: x.start(), runners) return generator_thread if __name__ == '__main__': def genthing(): for i in range(100): yield(i * i) def display(mythread, i): print mythread.id, "got", i time.sleep(1) gt = gen2consume(genthing(), 7, display) gt.join()