Below is the file 'www/common.py' from this revision. You can also download the file.


import urllib
import os
from pyPgSQL import libpq
from ConfigParser import ConfigParser

install_path='/home/grahame/monotone/netinfo/'

config = ConfigParser()
config.read(os.path.join(install_path, 'etc/www.cfg'))

def q(s):
	if s == None: return "NULL"
	else: return libpq.PgQuoteString(s)

def build_tuple_dictionary(res):
        rv = []
        for tuple_index in range(res.ntuples):
                rvn = {}
                for field_index in range(res.nfields):
                        rvn[res.fname(field_index)] = res.getvalue(tuple_index, field_index)
                rv.append(rvn)
        return rv

def get_dbconn():
	if not config.has_option('database', 'connect_string'): return None
	connect_string = config.get('database', 'connect_string')
	return libpq.PQconnectdb(connect_string)

def generate_graph(graph_name, contents):
	rv = {
		'dot_file' : os.path.join(install_path, 'www', 'cache', '%s.dot' % (graph_name)),
		'image_file' : os.path.join(install_path, 'www', 'cache', '%s.png' % (graph_name)),
		'imagemap_file' : os.path.join(install_path, 'www', 'cache', '%s.html' % (graph_name)),
		'dot_uri' : urllib.quote("cache/%s.dot" % (graph_name)),
		'image_uri' : urllib.quote("cache/%s.png" % (graph_name)),
		'imagemap_uri' : urllib.quote("cache/%s.html" % (graph_name)),
	}
	if os.access(rv['dot_file'], os.R_OK) and \
			os.access(rv['image_file'], os.R_OK) and \
			os.access(rv['imagemap_file'], os.R_OK) and \
			open(rv['dot_file']).read() == contents:
		rv['cached'] = True
	else:
		open(rv['dot_file'], 'w').write(contents)
		os.system("/usr/bin/dot -Tcmapx -o %s -Tpng -o %s %s" % (rv['imagemap_file'], rv['image_file'], rv['dot_file']))
		rv['cached'] = False
	return rv

def html_escape():
	"returns a function stolen from pydoc that can be used to escape HTML"
	import pydoc
	return pydoc.HTMLRepr().escape


def devicel(hostname, descr=None):
	if not hostname: return ''
	if not descr: descr = hostname
	hq = html_escape()
	return "<a href='deviceinfo.psp?hostname=%s'>%s</a>" % (urllib.quote(hostname), hq(descr))

def ipl(ip, descr=None):
	if not ip: return ''
	if not descr: descr = ip
	hq = html_escape()
	return "<a href='ip.psp?ip=%s'>%s</a>" % (urllib.quote(ip), hq(descr))

def macl(mac, descr=None):
	if not mac: return ''
	if not descr: descr = mac
	hq = html_escape()
	return "<a href='mac.psp?mac=%s'>%s</a>" % (urllib.quote(mac), hq(descr))

def get_l3path(cnx, from_ip, to_ip):
	def q(s):
		if s == None: return "NULL"
		else: return libpq.PgQuoteString(s)
	def get_starting_point():
		res = cnx.query("SELECT hostname FROM routing LIMIT 1")
		sp = build_tuple_dictionary(res)
		return sp[0]['hostname']
	def hostname_from_ip(ip):
		res = cnx.query("SELECT hostname FROM adjacency WHERE ip_address=%s" % (q(ip)))
		res = build_tuple_dictionary(res)
		if len(res) == 0: return
		return res[0]['hostname']
	def get_best_route(hostname, ip_address):
		query = "SELECT * FROM routing WHERE hostname=%s AND network >> %s ORDER BY masklen(network) DESC LIMIT 1" % (q(hostname), q(ip_address))
		res = cnx.query(query)
		bestroute = build_tuple_dictionary(res)
		if len(bestroute) == 0: return None
		else: return bestroute[0]
	def get_router(ip_address):
		query = "SELECT hostname FROM addresses WHERE host(ip_address)=%s LIMIT 1" % (q(ip_address))
		res = cnx.query(query)
		routers = build_tuple_dictionary(res)
		if len(routers): return routers[0]['hostname']
		else: return None
	def get_path(from_hostname, to_ip):
		# returns a tuple
		# ([ .. list of network devices .. ], ip address via)
		# if the path actually terminates on one of our devices, the second entry in the tuple will be None
		path = [from_hostname]
		while True:
			current_router = path[-1]
			bestroute = get_best_route(current_router, to_ip)
			if not bestroute: return None, None # not in routing table, then it's unreachable. No path.
			# determine the best route available on our current_router
			if bestroute.has_key('via'):
				# ok, so do we know of anything with this IP address in our adjacency table?
				next_router = get_router(bestroute['via'])
				# check for failure to find it, or a loop
				if not next_router:
					# we don't know about this router, so we'll return that information
					return path, bestroute['via']
				if next_router == current_router: break
				# all good, go to this host
				path.append(next_router)
			else: break # no 'via', must be connected
		return path, None
	####
	#
	# ALGORITHM:
	# choose a starting point
	# go to that starting point, and determine the best route to our destination
	#
	# if that route is connected (eg. local) then we are done. Otherwise, we should
	# check the adjacency table to find out if the next hop is one of the routers this
	# system knows about. If no match, then we stop - we have found the router closest
	# to this IP address (in terms of our network). If there is a match, go to that
	# router and repeat the above steps.
	#
	# Once we have determined the closest router to our destination address, we can
	# backtrack - determine which route that router would use to get to our from
	# address. Recurse using the algorithm above until we find the closest router to
	# our from address.
	#
	# This should give us the layer 3 path.
	#
	####
	starting_point = get_starting_point()
	# firstly, figure out the path from our arbitrary starting point to 'to_ip'
	# this tells us the last router in our network that services 'to_ip'
	init_path, init_via = get_path(starting_point, to_ip)
	if not init_path: return None, None, None
	endpoint_router = init_path[-1]
	# then, take the path from that router to 'from_ip'
	l3path, final_via = get_path(endpoint_router, from_ip)
	if not l3path: return None, None, None
	l3path.reverse()
	# l3path is now a list, showing the hops taken to get from
	# from_ip to to_ip that were within our network
	if init_via: # the hop to get to to_ip
		l3path.append(init_via)
	if final_via: # the hop to get to from_ip
		l3path = [final_via] + l3path
	return final_via == None, l3path, init_via == None