Below is the file 'cgi-bin/openid.fcgi' from this revision. You can also download the file.
#!/usr/bin/env python2.4
#
# UCCid, the UCC OpenID server.
#
# Standard libraries and path append.
import os, sys, shelve, re
from time import time
import threading
# Need this for web.py, flup, openid. web.py has been customised a bit. -- sj26
sys.path[0:0] = ["/home/wheel/sj26/lib/python/site-packages/"]
# OpenID imports... I know it's wacky but it makes the
# later code cleaner.
from openid.server.server import ProtocolError, Server as OpenIDServer
from openid.store.filestore import FileOpenIDStore
# Web.py and flup!
import web, cgi
import flup, flup.server.fcgi
from web.cheetah import render
from cgi import escape
from json import JsonWriter
import ldap
# TODO:
# * Remember me.
# * Multiple ID sources (tartarus? cyllene?)
# * Pretty up templates
# * Look at security of variable passing to templates and whether websafe
# or htmlquote should be used more.
# Some definitions.
base_dir = "/services/http-openid/"
base_url = "https://secure.ucc.asn.au/openid/"
base_identity = "http://%(user)s.ucc.asn.au/"
username_re = re.compile("http://([a-zA-Z0-9_-]+).ucc.asn.au/")
# Had to hack up web.py a bit to make cheetah templating work how I wanted.
# Should probably copy the cheetah.py file out of web.py and keep our own
# version. -- sj26
web.cheetah.base_dir = base_dir
# Url design and delegation, fairly boring.
# TODO: As these are all boring URLs would it be worth changing the default
# web.py handler to a pure plaintext selector? -- sj26
urls = (
"/(login)", "CheckID",
"/(approve)", "CheckID",
"/account", "Account",
"/about", "About",
"/logout", "Logout",
".*", "OpenID")
# Initialise the library.
store = FileOpenIDStore(base_dir + "store/openid/")
server = OpenIDServer(store)
# Make debugging fun!
web.webapi.internalerror = web.debugerror
class TrustDB:
"""
Holds a list of trust roots their approval states for a specified identity.
TODO: This probably needs moving to an SQL database. Postgres? -- sj26
"""
# Approved states6
APPROVED_ONCE = 1
APPROVED_ALWAYS = 2
approved_map = dict(once=APPROVED_ONCE, always=APPROVED_ALWAYS)
def __init__(self, user):
self.user = user
self.shelf = shelve.open(self.trust_file(), flag='c')
def __getitem__(self, trust_root):
return self.shelf.get(trust_root, False)
def __setitem__(self, trust_root, trust_approval):
trust_approval = TrustDB.approved_map.get(trust_approval, trust_approval)
if trust_approval not in TrustDB.approved_map.values():
raise ValueError("Invalid trust approval.")
self.shelf[trust_root] = trust_approval
def __delitem__(self, trust_root):
del self.shelf[trust_root]
def __iter__(self):
return self.shelf.__iter__()
def __repr__(self):
return "<TrustDB %s>" % repr(self.shelf)
def items(self):
return self.shelf.items()
def get(self, *a, **kw):
return self.shelf.get(*a, **kw)
def trust_file(self):
""" Returns the name of the trust file for the current user. """
return base_dir + "store/trust/" + self.user
def is_trusted(trust_root, by=None):
"""
Asserts that a user trusts this trust_root. Removes the trust entry if trust
is TrustDB.APPROVED_ONCE .
"""
if by == None: by = web.session.user
trust_db = TrustDB(by)
approval = trust_db[trust_root]
if approval == TrustDB.APPROVED_ONCE:
del trust_db[trust_root]
return approval
def is_logged_in(username=None):
"""
Asserts that the specified user is currently logged in.
"""
if web.session.get("user", None) == None:
return False
if username != None:
if username != web.session.user:
return False
if web.session.get("ip", web.ctx.ip) != web.ctx.ip:
return False
return True
def logged_in(f):
""" Decorator which will redirect to login page if not logged in. """
def internal(*a, **kw):
if not is_logged_in():
return web.seeother("login")
f(*a, **kw)
return internal
def verify_session_ip():
""" Stop session hijacks. """
web.ctx.ip = web.ctx.env.get("HTTP_X_FORWARDED_FOR", web.ctx.ip)
if web.session.get('ip', None) != None and web.session.ip != web.ctx.ip:
web.session.user = None
web.session.ip = None
web.ctx.status = "403"
web.ctx.output = "Session is invalid."
return False
web._loadhooks['verify_session_ip'] = verify_session_ip
def username_from_identity(identity):
""" Extract the username from an identity. """
match = username_re.match(identity)
if match != None:
return match.groups()[0]
return None
def get_request(query=None, request_key=None):
""" Get the current OpenID request. Returns None on failure. """
query = web.input()
# Grab from session if possible
if request_key == None and 'request' in query and query.request not in ['user', 'ip']:
request_key = query.request
if request_key != None:
request = web.session.get(request_key, None)
if request != None:
return request
# Otherwise look for the request in query parameters
try:
return server.decodeRequest(query)
except ProtocolError, why:
pass
# Catch all
return None
def valid_request(request):
""" Make sure the request is valid and legal. """
# Is the return_to address within the trust_root?
if request != None and not request.trustRootValid():
# TODO: should probably explain this to the user
web.seeother(request.getCancelURL())
return False
return True
def request_identity(request):
# Extract username and identity from request if possible.
if request != None:
return request.identity, username_from_identity(request.identity)
return None, None
class CheckID:
def GET(self, mode, failed=False, failed_username=None, request_key=None):
global base_identity
if request_key == None: request_key = web.input().get('request', None)
request = get_request(request_key=request_key)
if request != None: assert valid_request(request)
identity, username = request_identity(request)
default_username = None
if username == None and failed_username != None:
default_username = failed_username
identity = base_identity % dict(user=default_username)
# Redirect if neccessary
if mode == "login" and is_logged_in(username):
if request != None:
return web.seeother("approve?request=%s" % request_key)
else:
return web.seeother("account")
elif mode == "approve" and not is_logged_in(username):
return web.seeother("login?request=%s" % request_key)
elif mode == "approve" and is_trusted(request.trust_root):
return web.seeother(base_url + "?request=" + request_key)
# For the template
trust_root = None
if request != None: trust_root = request.trust_root
if username == None: username = ""
terms = {'username': username, 'identity': identity, 'failed': failed,
'base_identity': base_identity, 'trust_root': trust_root,
'default_username': default_username, 'request': request,
'request_key': request_key}
if mode == "login":
terms.update({'title': "Login", 'mode': "login",
'submit_name': "Login", 'cancel_name': "Cancel"})
else:
terms.update({'title': "Identification Approval", 'mode': "approve",
'submit_name': "Approve", 'cancel_name': "Cancel"})
return render("checkid.html", terms=terms)
def POST(self, mode):
query = web.input()
request_key = query.get('request', '')
request = get_request()
assert valid_request(request)
if "cancel" in query:
if request != None:
return web.seeother(request.getCancelURL())
else:
return web.seeother("account")
# Beyond here I'm assuming submit (hitting enter won't set the query parameter)
elif mode == "login":
query = web.input("username", "password")
# TODO: REALLY needs some sort of validation/filtering of input details.
try:
dn = "uid=%s,ou=People,dc=ucc,dc=gu,dc=uwa,dc=edu,dc=au" % query.username
assert ldap.open("localhost").simple_bind_s(dn, query.password)[0] == 97
except ldap.INVALID_CREDENTIALS:
return self.GET("login", failed=True, failed_username=query.username)
web.session.user = query.username
web.session.ip = web.ctx.ip
if request != None:
if is_trusted(request.trust_root):
return web.seeother(base_url + "?request=" + request_key)
else:
return web.seeother("approve?request=" + request_key)
else:
return web.seeother("account")
elif mode == "approve":
if request != None:
approval = TrustDB.APPROVED_ONCE
if query.get("always", False):
approval = TrustDB.APPROVED_ALWAYS
trustdb = TrustDB(query.username)
trustdb[request.trust_root] = approval
return web.seeother(base_url + "?request=" + request_key)
else:
raise Exception("Strange %s %r." % (mode, web.ctx.query))
def request_fields(self, request):
response = ""
for name, value in web.input().items():
if name.startswith("openid."):
response += """<input type="hidden" name="%s" value="%s" />""" % (escape(name), escape(value))
return response
class Account:
"""
Shows account information
"""
@logged_in
def GET(self, success=None, notice=None):
query = web.input()
success, notice = False, None
self.trust_db = TrustDB(web.session.user)
if 'untrust' in query:
if self.trust_db.get(query.untrust, None) != None:
del self.trust_db[query.untrust]
success = True
notice = "The site %s is not longer trusted." % query.untrust
else:
notice = "You do not currently trust that site."
if 'json' in query:
json = dict(success=success)
if notice != None:
json['notice'] = notice
print JsonWriter.write(json)
return
sites = [site for site, approval in self.trust_db.items() if approval == TrustDB.APPROVED_ALWAYS]
print render("account.html", terms=dict(
success=success, notice=notice, sites=sites, user=web.session.user, ip=web.session.ip))
class Logout:
def GET(self):
web.session.invalidate()
return web.seeother(base_url)
POST = GET
class OpenID:
"""
Handles the server-to-server communication of openid.
Will redirect the user to an appropriate browser page if neccessary.
"""
def GET(self):
# try and decode the request, redirect if not present completely, or raise a 400 Bad Request
request = get_request()
if request == None:
query = web.input()
if 'openid.mode' in query:
try:
request = server.decodeRequest(query)
except ProtocolError, why:
web.badrequest()
else:
return web.seeother("about")
# We handle login requests, but nothing else. Hooray python-openid!
if request.mode in ["checkid_immediate", "checkid_setup"]:
# Get username from requested identity, if possible.
username = username_re.match(request.identity)
if username != None:
username = username.groups()[0]
answer, mode = False, None
# If we're already logged in and we trust the site, bounce straight back.
if is_logged_in(username) and is_trusted(request.trust_root):
answer = True
# If we're logged in the user needs to be directed to the approval page.
elif is_logged_in(username):
answer = False
mode = "approve"
# Otherwise the user needs to be directed to the login page.
else:
answer = False
mode = "login"
# checkid_setup lets us do out login/approve process so we go to the
# appropriate page
if request.mode == "checkid_setup" and not answer:
# TODO: use random identifier
request_key = web.session.generateIdentifier()
web.session[request_key] = request
return CheckID().GET(mode, request_key=request_key)
# checkid_immediate is the server asking us to check the id straigh away
# and not to show any pages of our rown.
else:
if not answer:
response = request.answer(answer, base_url + mode + web.ctx.query)
else:
response = request.answer(answer)
# It's python-openid's job.
else:
response = server.handleRequest(request)
# Turn the response into an HTTP message
response = server.encodeResponse(response)
# Spit out the response in web.py form
status_dict = {302: "Redirect", 200: "Found"}
web.ctx.status = str(response.code) + " " + status_dict[response.code]
for header, value in response.headers.items():
web.header(header, value)
print response.body
# Same deal for both methods.
POST = GET
class About:
def GET(self):
render("index.html", terms=dict(user=web.session.user))
def cleanup_sessions():
""" Cleans up sessions every 5 minutes. """
web.env['com.saddi.service.session']._store.periodic()
threading.Timer(300.0, cleanup_sessions)
if __name__ == "__main__":
cookieAttributes = {'domain': 'secure.ucc.asn.au', 'path': '/openid/',
'secure': 'secure'}
session_mw = web.sessions(web.DiskSessionStore, \
storeDir=base_dir+"store/session/", defaults={"user": None, "ip": None},
cookieAttributes=cookieAttributes)
threading.Timer(300.0, cleanup_sessions)
sys.argv.append("fastcgi")
web.run(urls, locals(), session_mw)