Below is the file 'icalinate.py' from this revision. You can also download the file.
#!/usr/bin/env python # # iCalinate : Expose simple information from an iCal # file so other applications don't have to care. # # Copyright (C) 2007 Grahame Bowland # import StringIO import datetime, time import vobject import heapq import json import sys def _debug (s): print >> sys.stderr, "debug:", s class EventWrapper(object): def __init__ (self, vevent, offset): self.vevent = vevent self.offset = offset vevent.getrruleset (addRDate=True) rruleset = vevent.getrruleset (addRDate=True) self.count = 0 if rruleset: self.rruleset_iter = iter (rruleset) else: self.next_occurrence = self.vevent.getChildValue ('dtstart') self.rruleset_iter = None def recur (self): self.count += 1 if self.rruleset_iter != None: self.next_occurrence = self.rruleset_iter.next() if self.next_occurrence and self.offset: self.next_occurrence -= self.offset elif self.next_occurrence != None and self.count > 1: self.next_occurrence = None return self.next_occurrence != None def __cmp__ (self, other_event): return cmp (self.next_occurrence, other_event.next_occurrence) class EventOccurrence(object): def __init__ (self, occurrence, event): self.occurrence = occurrence self.event = event self.__calced = False def __calc (self): if self.__calced: return self.__dtstart = self.event.getChildValue ('dtstart') self.__dtend = self.event.getChildValue ('dtend') self.__duration = self.event.getChildValue ('duration') if not self.__duration: if self.__dtstart and self.__dtend: self.__duration = self.__dtend - self.__dtstart else: self.__duration = None self.__calced = True def start (self): self.__calc () return self.occurrence def end (self): self.__calc () return self.occurrence + self.__duration def duration (self): self.__calc () return self.__duration def running (self, now=None): self.__calc () if now == None: now = datetime.datetime.now () if self.__duration and self.occurrence + self.__duration >= now: return True return False def later (self, now=None): self.__calc () if now == None: now = datetime.datetime.now () if self.occurrence >= now: return True return False def get (self, *args, **kwargs): return self.event.getChildValue (*args, **kwargs) class Upcoming(object): def __init__ (self, ical_fd, offset=None): self.ical = vobject.readOne (ical_fd) self.offset = offset self.heap = [] for vevent in self.ical.vevent_list: wrapped = EventWrapper (vevent, offset) if wrapped.recur (): heapq.heappush (self.heap, wrapped) def __iter__ (self): while len (self.heap) > 0: wrapped = heapq.heappop (self.heap) yield EventOccurrence (wrapped.next_occurrence, wrapped.vevent) if wrapped.recur (): heapq.heappush (self.heap, wrapped) class iCalinate(object): def __init__ (self, ical_fd, offset=None, criteria=["later", "running"]): self.upcoming = Upcoming (ical_fd, offset=offset) self.criteria = criteria self.__iter_cache = [] self.back_to_utc = -1 * int (time.time () - time.mktime (time.gmtime ())) def __unix_from_datetime (self, dt): # dt will be in local time return time.mktime (dt.timetuple ()) + 1e-6 * dt.microsecond + self.back_to_utc def summary (self, nevents=10): rv = [] for idx, event in enumerate (self): if idx >= nevents: break entry = {} entry['start'] = event.start () entry['duration'] = event.duration () entry['end'] = event.end () for attr in ('summary', 'description'): entry[attr] = event.get (attr) rv.append (entry) return rv def json_summary (self, *args, **kwargs): rv = self.summary (*args, **kwargs) writer = json.JsonWriter () for entry in rv: for k in entry.keys (): v = entry[k] if isinstance (v, datetime.datetime): entry[k] = self.__unix_from_datetime (v) else: entry[k] = str(v) return writer.write (rv) def check_running (self, event, now): for fn_name in self.criteria: if not getattr (event, fn_name) (now): return False return True def __iter__ (self): now = datetime.datetime.now () old_count = 0 for event in self.__iter_cache: if self.check_running (event, now): break else: old_count += 1 self.__iter_cache = self.__iter_cache[old_count:] for event in self.__iter_cache: yield event for event in self.upcoming: if not self.check_running (event, now): continue self.__iter_cache.append (event) yield event import httplib import urlparse import gzip class UriWatchInstantiate: def __init__ (self, uri, instantiate_fn, refresh=datetime.timedelta (hours=2)): self.uri, self.instantiate_fn = uri, instantiate_fn self.refresh = refresh or datetime.da self.__instance, self.instance_dt = None, None self.site = urlparse.urlparse (self.uri)[1] self.etag, self.last_modified = None, None def get_data (self): data = None conn = httplib.HTTPConnection (self.site) extra_headers = { 'Accept-encoding' : 'gzip', 'User-agent' : 'iCalinate' } if self.etag != None: extra_headers['If-None-Match'] = self.etag if self.last_modified != None: extra_headers['If-Modified-Since'] = self.last_modified conn.request ("GET", self.uri, headers=extra_headers) r = conn.getresponse () if r.status == 200: # OK data = r.read () if r.getheader ('content-encoding', None) == 'gzip': data = gzip.GzipFile (fileobj=StringIO.StringIO (data)).read () self.last_modified = r.getheader ('last-modified') self.etag = r.getheader ('etag') conn.close () return data def instance (self): now = datetime.datetime.now () if self.instance_dt == None or (now - self.instance_dt) > self.refresh: contents = self.get_data () if contents: self.instance_dt = now self.__instance = self.instantiate_fn (contents) return self.__instance import web, urllib, cgi urls = ('/', 'FrontPage', '/icalinate/(.*)', 'iCalinatePage' ) # m is for meta def make_uri_wi (uri, icalinate_kw, **kwargs): def make_icalinate_cb (contents): try: return iCalinate (StringIO.StringIO (contents), **icalinate_kw) except vobject.base.ParseError: # deal with abolutely horribly encoded feeds with a giant stick (this is wrong, # but it works in at least some cases.) contents = ''.join ([t for t in contents if ord (t) < 128]) return iCalinate (StringIO.StringIO (contents), **icalinate_kw) return UriWatchInstantiate (uri, make_icalinate_cb, **kwargs) sources = { 'rtrfm' : make_uri_wi ('http://work.papercutmedia.com/rtr/ical', {'offset' : datetime.timedelta(hours=-8)}), 'uwaevents' : make_uri_wi ('http://events.uwa.edu.au/view/publicaffairs/ics/publicaffairs.ics', {'criteria' : ["later"]}), 'dna' : make_uri_wi ('http://www.dnalounge.com/calendar/dnalounge.ics', {}), } class FrontPage: def __init__ (self): pass def GET (self): print "<html><head><title>iCalinate</title></head><body><p>Available feeds follow:</p>" for source in sources.keys (): print '<a href="icalinate/%s">%s</a><br />' % (urllib.quote (source), cgi.escape(source)) print "</body></html>" class iCalinatePage: def GET (self, source): if not sources.has_key (source): return web.notfound () instance = sources[source].instance () if instance: print instance.json_summary () else: return web.internalerror () # fixme, is there a better code for this? if __name__ == '__main__': web.run (urls, globals ()) # nated = iCalinate ('ical', offset=datetime.timedelta(hours=-8)) # import pprint # nated.json_summary ()