Below is the file 'goatpy/gen2consume.py' from this revision. You can also download the file.

#!/usr/bin/python

#
# a rewrite of scan.py to use better threading ideas.
#
# we have N threads that run plugins.
#
# we have a main thread that updates a condition (for queue)
# and threads ask for data when they want it
#
# thanks to Ian McKellar for spotting the empty list / None
# bug.

import threading
import time

class Queue:
	def __init__(self):
		self.items = []
		self.is_dead = False
	def is_empty(self):
		return self.items == []
	def append(self, new):
		self.items.append(new)
	def pop(self):
		if self.is_empty(): return None
		rv = self.items[0]
		self.items = self.items[1:]
		return rv

class RunGenerator(threading.Thread):
	def __init__(self, cond, queue, generator):
		threading.Thread.__init__(self)
		self.cond = cond
		self.queue = queue
		self.generator = generator
		self.setDaemon(True)
	def run(self):
		for i in self.generator:
			self.cond.acquire()
			self.queue.append(i)
			self.cond.notify()
			self.cond.release()
		self.queue.is_dead = True
		# ok, send each thread a spurious notify. they'll
		# each notice that there is nothing in the queue and exit
		self.cond.acquire()
		self.cond.notifyAll()
		self.cond.release()
		# wait for the queue to become empty
		while not self.queue.is_empty():
			time.sleep(1)

class RunFunction(threading.Thread):
	def __init__(self, cond, queue, function, id):
		threading.Thread.__init__(self, name="RunFunction id %s" % (id))
		self.cond = cond
		self.queue = queue
		self.function = function
		self.id = id
		self.setDaemon(True)
		self.status_string = ""
	def run(self):
		# if we wake up after calling wait()
		while 1:
			self.cond.acquire()
			while self.queue.is_empty() and not self.queue.is_dead:
				self.cond.wait()
			# if we woke up and nothing is in the queue, we have been
			# requested to exit
			if self.queue.is_empty():
				self.cond.release()
				return
			data = self.queue.pop()
			self.cond.release()
			self.function(self, data)

runner_threads = 10

def gen2consume(generator, max_threads, thread_function):
	cond = threading.Condition()
	queue = Queue()
	generator_thread = RunGenerator(cond, queue, generator)
	generator_thread.start()
	runners = [RunFunction(cond, queue, thread_function, t) for t in range(max_threads)]
	map(lambda x: x.start(), runners)
	return generator_thread

if __name__ == '__main__':
	def genthing():
		for i in range(100):
			yield(i * i)

	def display(mythread, i):
		print mythread.id, "got", i
		time.sleep(1)

	gt = gen2consume(genthing(), 7, display)
	gt.join()