Below is the file 'icalinate.py' from this revision. You can also download the file.

#!/usr/bin/env python

#
# iCalinate : Expose simple information from an iCal
# file so other applications don't have to care.
#
# Copyright (C) 2007 Grahame Bowland
#

import StringIO
import datetime, time
import vobject
import heapq
import json
import sys

def _debug (s):
	print >> sys.stderr, "debug:", s

class EventWrapper(object):
	def __init__ (self, vevent, offset):
		self.vevent = vevent
		self.offset = offset
		vevent.getrruleset (addRDate=True)
		rruleset = vevent.getrruleset (addRDate=True)
		self.count = 0
		if rruleset:
			self.rruleset_iter = iter (rruleset)
		else:
			self.next_occurrence = self.vevent.getChildValue ('dtstart')
			self.rruleset_iter = None

	def recur (self):
		self.count += 1
		if self.rruleset_iter != None:
			self.next_occurrence = self.rruleset_iter.next()
			if self.next_occurrence and self.offset:
				self.next_occurrence -= self.offset
		elif self.next_occurrence != None and self.count > 1:
			self.next_occurrence = None
		return self.next_occurrence != None

	def __cmp__ (self, other_event):
		return cmp (self.next_occurrence, other_event.next_occurrence)

class EventOccurrence(object):
	def __init__ (self, occurrence, event):
		self.occurrence = occurrence
		self.event = event
		self.__calced = False

	def __calc (self):
		if self.__calced:
			return
		self.__dtstart = self.event.getChildValue ('dtstart')
		self.__dtend = self.event.getChildValue ('dtend')
		self.__duration = self.event.getChildValue ('duration')
		if not self.__duration:
			if self.__dtstart and self.__dtend:
				self.__duration = self.__dtend - self.__dtstart
			else:
				self.__duration = None
		self.__calced = True

	def start (self):
		self.__calc ()
		return self.occurrence

	def end (self):
		self.__calc ()
		return self.occurrence + self.__duration

	def duration (self):
		self.__calc ()
		return self.__duration

	def running (self, now=None):
		self.__calc ()
		if now == None:
			now = datetime.datetime.now ()
		if self.__duration and self.occurrence + self.__duration >= now:
			return True
		return False

	def later (self, now=None):
		self.__calc ()
		if now == None:
			now = datetime.datetime.now ()
		if self.occurrence >= now:
			return True
		return False

	def get (self, *args, **kwargs):
		return self.event.getChildValue (*args, **kwargs)


class Upcoming(object):
	def __init__ (self, ical_fd, offset=None):
		self.ical = vobject.readOne (ical_fd)
		self.offset = offset
		self.heap = []
		for vevent in self.ical.vevent_list:
			wrapped = EventWrapper (vevent, offset)
			if wrapped.recur ():
				heapq.heappush (self.heap, wrapped)

	def __iter__ (self):
		while len (self.heap) > 0:
			wrapped = heapq.heappop (self.heap)
			yield EventOccurrence (wrapped.next_occurrence, wrapped.vevent)
			if wrapped.recur ():
				heapq.heappush (self.heap, wrapped)

class iCalinate(object):
	def __init__ (self, ical_fd, offset=None, criteria=["later", "running"]):
		self.upcoming = Upcoming (ical_fd, offset=offset)
		self.criteria = criteria
		self.__iter_cache = []
		self.back_to_utc = -1 * int (time.time () - time.mktime (time.gmtime ()))

	def __unix_from_datetime (self, dt):
		# dt will be in local time
		return time.mktime (dt.timetuple ()) + 1e-6 * dt.microsecond + self.back_to_utc

	def summary (self, nevents=10):
		rv = []
		for idx, event in enumerate (self):
			if idx >= nevents:
				break
			entry = {}
			entry['start'] = event.start ()
			entry['duration'] = event.duration ()
			entry['end'] = event.end ()
			for attr in ('summary', 'description'):
				entry[attr] = event.get (attr)
			rv.append (entry)
		return rv

	def json_summary (self, *args, **kwargs):
		rv = self.summary (*args, **kwargs)
		writer = json.JsonWriter ()
		for entry in rv:
			for k in entry.keys ():
				v = entry[k]
				if isinstance (v, datetime.datetime):
					entry[k] = self.__unix_from_datetime (v)
				else:
					entry[k] = str(v)
		return writer.write (rv)

	def check_running (self, event, now):
		for fn_name in self.criteria:
			if not getattr (event, fn_name) (now):
				return False
		return True

	def __iter__ (self):
		now = datetime.datetime.now ()
		old_count = 0
		for event in self.__iter_cache:
			if self.check_running (event, now):
				break
			else:
				old_count += 1
		self.__iter_cache = self.__iter_cache[old_count:]
		for event in self.__iter_cache:
			yield event
		for event in self.upcoming:
			if not self.check_running (event, now):
				continue
			self.__iter_cache.append (event)
			yield event

import httplib
import urlparse
import gzip

class UriWatchInstantiate:
	def __init__ (self, uri, instantiate_fn, refresh=datetime.timedelta (hours=2)):
		self.uri, self.instantiate_fn = uri, instantiate_fn
		self.refresh = refresh or datetime.da
		self.__instance, self.instance_dt = None, None
		self.site = urlparse.urlparse (self.uri)[1]
		self.etag, self.last_modified = None, None

	def get_data (self):
		data = None
		conn = httplib.HTTPConnection (self.site)
		extra_headers = { 'Accept-encoding' : 'gzip',
				  'User-agent' : 'iCalinate' }
		if self.etag != None:
			extra_headers['If-None-Match'] = self.etag
		if self.last_modified != None:
			extra_headers['If-Modified-Since'] = self.last_modified
		conn.request ("GET", self.uri, headers=extra_headers)
		r = conn.getresponse ()
		if r.status == 200: # OK
			data = r.read ()
			if r.getheader ('content-encoding', None) == 'gzip':
				data = gzip.GzipFile (fileobj=StringIO.StringIO (data)).read ()
			self.last_modified = r.getheader ('last-modified')
			self.etag = r.getheader ('etag')
		conn.close ()
		return data

	def instance (self):
		now = datetime.datetime.now ()
		if self.instance_dt == None or (now - self.instance_dt) > self.refresh:
			contents = self.get_data ()
			if contents:
				self.instance_dt = now
				self.__instance = self.instantiate_fn (contents)
		return self.__instance

import web, urllib, cgi

urls = ('/', 'FrontPage',
	'/icalinate/(.*)', 'iCalinatePage'
	)

# m is for meta
def make_uri_wi (uri, icalinate_kw, **kwargs):
	def make_icalinate_cb (contents):
		try:
			return iCalinate (StringIO.StringIO (contents), **icalinate_kw)
		except vobject.base.ParseError:
			# deal with abolutely horribly encoded feeds with a giant stick (this is wrong,
			# but it works in at least some cases.)
			contents = ''.join ([t for t in contents if ord (t) < 128])
			return iCalinate (StringIO.StringIO (contents), **icalinate_kw)
	return UriWatchInstantiate (uri, make_icalinate_cb, **kwargs)

sources = {
	    'rtrfm' : make_uri_wi ('http://work.papercutmedia.com/rtr/ical', {'offset' : datetime.timedelta(hours=-8)}),
	    'uwaevents' : make_uri_wi ('http://events.uwa.edu.au/view/publicaffairs/ics/publicaffairs.ics', {'criteria' : ["later"]}),
	    'dna' : make_uri_wi ('http://www.dnalounge.com/calendar/dnalounge.ics', {}),
	  }

class FrontPage:
	def __init__ (self):
		pass

	def GET (self):
		print "<html><head><title>iCalinate</title></head><body><p>Available feeds follow:</p>"
		for source in sources.keys ():
			print '<a href="icalinate/%s">%s</a><br />' % (urllib.quote (source), cgi.escape(source))
		print "</body></html>"

class iCalinatePage:
	def GET (self, source):
		if not sources.has_key (source):
			return web.notfound ()
		instance = sources[source].instance ()
		if instance:
			print instance.json_summary ()
		else:
			return web.internalerror () # fixme, is there a better code for this?

if __name__ == '__main__':
	web.run (urls, globals ())
#	nated = iCalinate ('ical', offset=datetime.timedelta(hours=-8))
#	import pprint
#	nated.json_summary ()