Below is the file 'icalinate.py' from this revision. You can also download the file.
#!/usr/bin/env python # # iCalinate : Expose simple information from an iCal # file so other applications don't have to care. # # Copyright (C) 2007 Grahame Bowland # import StringIO import datetime, time import vobject import heapq import json import sys class EventWrapper(object): def __init__ (self, vevent, offset): self.vevent = vevent self.offset = offset self.rruleset_iter = iter (vevent.getrruleset (addRDate=True)) self.recur () def recur (self): self.next_occurrence = self.rruleset_iter.next() if self.next_occurrence and self.offset: self.next_occurrence -= self.offset return self.next_occurrence != None def __cmp__ (self, other_event): return cmp (self.next_occurrence, other_event.next_occurrence) class EventOccurrence(object): def __init__ (self, occurrence, event): self.occurrence = occurrence self.event = event self.__calced = False def __calc (self): if self.__calced: return self.__dtstart = self.event.getChildValue ('dtstart') self.__dtend = self.event.getChildValue ('dtend') self.__duration = self.event.getChildValue ('duration') if not self.__duration: if self.__dtstart and self.__dtend: self.__duration = self.__dtend - self.__dtstart else: self.__duration = None self.__calced = True def start (self): self.__calc () return self.occurrence def end (self): self.__calc () return self.occurrence + self.__duration def duration (self): self.__calc () return self.__duration def on_or_later (self, now=None): self.__calc () if now == None: now = datetime.datetime.now () if self.occurrence >= now: return True if self.__duration and self.occurrence + self.__duration >= now: return True return False def get (self, *args, **kwargs): return self.event.getChildValue (*args, **kwargs) class Upcoming(object): def __init__ (self, ical_fd, offset=None): self.ical = vobject.readOne (ical_fd) self.offset = offset self.heap = [] for vevent in self.ical.vevent_list: heapq.heappush (self.heap, EventWrapper (vevent, offset)) def __iter__ (self): while len (self.heap) > 0: wrapped = heapq.heappop (self.heap) yield EventOccurrence (wrapped.next_occurrence, wrapped.vevent) if wrapped.recur (): heapq.heappush (self.heap, wrapped) class iCalinate(object): def __init__ (self, *args, **kwargs): self.upcoming = Upcoming (*args, **kwargs) self.__iter_cache = [] self.back_to_utc = -1 * int (time.time () - time.mktime (time.gmtime ())) def __unix_from_datetime (self, dt): # dt will be in local time return time.mktime (dt.timetuple ()) + 1e-6 * dt.microsecond + self.back_to_utc def summary (self, nevents=10): rv = [] for idx, event in enumerate (self): if idx >= nevents: break entry = {} entry['start'] = event.start () entry['duration'] = event.duration () entry['end'] = event.end () for attr in ('summary', 'description'): entry[attr] = event.get (attr) rv.append (entry) return rv def json_summary (self, *args, **kwargs): rv = self.summary (*args, **kwargs) writer = json.JsonWriter () for entry in rv: for k in entry.keys (): v = entry[k] if isinstance (v, datetime.datetime): entry[k] = self.__unix_from_datetime (v) else: entry[k] = str(v) print writer.write (rv) def __iter__ (self): now = datetime.datetime.now () old_count = 0 for event in self.__iter_cache: if event.on_or_later (now=now): break else: old_count += 1 self.__iter_cache = self.__iter_cache[old_count:] for event in self.__iter_cache: yield event for event in self.upcoming: if not event.on_or_later (now): continue self.__iter_cache.append (event) yield event class UriWatchInstantiate: def __init__ (self, uri, instantiate_fn, refresh=None): self.uri = uri self.instantiate_fn = instantiate_fn self.refresh = refresh if refresh == None: self.refresh = datetime.timedelta (hours=2) self.__instance = None self.instance_dt = None def get_data (self): return open ('rtr.ical').read () def instance (self): now = datetime.datetime.now () if self.instance_dt == None or (now - self.instance_dt) > self.refresh: contents = self.get_data () if contents: self.instance_dt = now self.__instance = self.instantiate_fn (contents) return self.__instance import web, urllib, cgi urls = ('/', 'FrontPage', '/icalinate/(.*)', 'iCalinatePage' ) # m is for meta def make_uri_wi (uri, icalinate_kw, **kwargs): def make_icalinate_cb (contents): return iCalinate (StringIO.StringIO (contents), **icalinate_kw) return UriWatchInstantiate (uri, make_icalinate_cb, **kwargs) sources = { 'rtrfm' : make_uri_wi ('http://work.papercutmedia.com/rtr/ical', {'offset' : datetime.timedelta(hours=-8)}) } class FrontPage: def __init__ (self): pass def GET (self): print "<html><head><title>iCalinate</title></head><body><p>Available feeds follow:</p>" for source in sources.keys (): print '<a href="icalinate/%s">%s</a><br />' % (urllib.quote (source), cgi.escape(source)) print "</body></html>" class iCalinatePage: def GET (self, source): if not sources.has_key (source): return web.notfound () print sources[source].instance ().json_summary () if __name__ == '__main__': web.run (urls, globals ()) # nated = iCalinate ('ical', offset=datetime.timedelta(hours=-8)) # import pprint # nated.json_summary ()