Below is the file 'www/common.py' from this revision. You can also download the file.
import urllib import os from pyPgSQL import libpq from ConfigParser import ConfigParser install_path='/home/grahame/monotone/netinfo/' config = ConfigParser() config.read(os.path.join(install_path, 'etc/www.cfg')) def q(s): if s == None: return "NULL" else: return libpq.PgQuoteString(s) def build_tuple_dictionary(res): rv = [] for tuple_index in range(res.ntuples): rvn = {} for field_index in range(res.nfields): rvn[res.fname(field_index)] = res.getvalue(tuple_index, field_index) rv.append(rvn) return rv def get_dbconn(): if not config.has_option('database', 'connect_string'): return None connect_string = config.get('database', 'connect_string') return libpq.PQconnectdb(connect_string) def generate_graph(graph_name, contents): rv = { 'dot_file' : os.path.join(install_path, 'www', 'cache', '%s.dot' % (graph_name)), 'image_file' : os.path.join(install_path, 'www', 'cache', '%s.png' % (graph_name)), 'imagemap_file' : os.path.join(install_path, 'www', 'cache', '%s.html' % (graph_name)), 'dot_uri' : urllib.quote("cache/%s.dot" % (graph_name)), 'image_uri' : urllib.quote("cache/%s.png" % (graph_name)), 'imagemap_uri' : urllib.quote("cache/%s.html" % (graph_name)), } if os.access(rv['dot_file'], os.R_OK) and \ os.access(rv['image_file'], os.R_OK) and \ os.access(rv['imagemap_file'], os.R_OK) and \ open(rv['dot_file']).read() == contents: rv['cached'] = True else: open(rv['dot_file'], 'w').write(contents) os.system("/usr/bin/dot -Tcmapx -o %s -Tpng -o %s %s" % (rv['imagemap_file'], rv['image_file'], rv['dot_file'])) rv['cached'] = False return rv def html_escape(): "returns a function stolen from pydoc that can be used to escape HTML" import pydoc return pydoc.HTMLRepr().escape def devicel(hostname, descr=None): if not hostname: return '' if not descr: descr = hostname hq = html_escape() return "<a href='deviceinfo.psp?hostname=%s'>%s</a>" % (urllib.quote(hostname), hq(descr)) def ipl(ip, descr=None): if not ip: return '' if not descr: descr = ip hq = html_escape() return "<a href='ip.psp?ip=%s'>%s</a>" % (urllib.quote(ip), hq(descr)) def macl(mac, descr=None): if not mac: return '' if not descr: descr = mac hq = html_escape() return "<a href='mac.psp?mac=%s'>%s</a>" % (urllib.quote(mac), hq(descr)) def get_l3path(cnx, from_ip, to_ip): def q(s): if s == None: return "NULL" else: return libpq.PgQuoteString(s) def get_starting_point(): res = cnx.query("SELECT hostname FROM routing LIMIT 1") sp = build_tuple_dictionary(res) return sp[0]['hostname'] def hostname_from_ip(ip): res = cnx.query("SELECT hostname FROM adjacency WHERE ip_address=%s" % (q(ip))) res = build_tuple_dictionary(res) if len(res) == 0: return return res[0]['hostname'] def get_best_route(hostname, ip_address): query = "SELECT * FROM routing WHERE hostname=%s AND network >> %s ORDER BY masklen(network) DESC LIMIT 1" % (q(hostname), q(ip_address)) res = cnx.query(query) bestroute = build_tuple_dictionary(res) if len(bestroute) == 0: return None else: return bestroute[0] def get_router(ip_address): query = "SELECT hostname FROM addresses WHERE host(ip_address)=%s LIMIT 1" % (q(ip_address)) res = cnx.query(query) routers = build_tuple_dictionary(res) if len(routers): return routers[0]['hostname'] else: return None def get_path(from_hostname, to_ip): # returns a tuple # ([ .. list of network devices .. ], ip address via) # if the path actually terminates on one of our devices, the second entry in the tuple will be None path = [from_hostname] while True: current_router = path[-1] bestroute = get_best_route(current_router, to_ip) if not bestroute: return None, None # not in routing table, then it's unreachable. No path. # determine the best route available on our current_router if bestroute.has_key('via'): # ok, so do we know of anything with this IP address in our adjacency table? next_router = get_router(bestroute['via']) # check for failure to find it, or a loop if not next_router: # we don't know about this router, so we'll return that information return path, bestroute['via'] if next_router == current_router: break # all good, go to this host path.append(next_router) else: break # no 'via', must be connected return path, None #### # # ALGORITHM: # choose a starting point # go to that starting point, and determine the best route to our destination # # if that route is connected (eg. local) then we are done. Otherwise, we should # check the adjacency table to find out if the next hop is one of the routers this # system knows about. If no match, then we stop - we have found the router closest # to this IP address (in terms of our network). If there is a match, go to that # router and repeat the above steps. # # Once we have determined the closest router to our destination address, we can # backtrack - determine which route that router would use to get to our from # address. Recurse using the algorithm above until we find the closest router to # our from address. # # This should give us the layer 3 path. # #### starting_point = get_starting_point() # firstly, figure out the path from our arbitrary starting point to 'to_ip' # this tells us the last router in our network that services 'to_ip' init_path, init_via = get_path(starting_point, to_ip) if not init_path: return None, None, None endpoint_router = init_path[-1] # then, take the path from that router to 'from_ip' l3path, final_via = get_path(endpoint_router, from_ip) if not l3path: return None, None, None l3path.reverse() # l3path is now a list, showing the hops taken to get from # from_ip to to_ip that were within our network if init_via: # the hop to get to to_ip l3path.append(init_via) if final_via: # the hop to get to from_ip l3path = [final_via] + l3path return final_via == None, l3path, init_via == None