Below is the file 'viewmtn.py' from this revision. You can also download the file.
#!/usr/bin/env python # Copyright (C) 2005 Grahame Bowland <grahame@angrygoats.net> # # This program is made available under the GNU GPL version 2.0 or # greater. See the accompanying file COPYING for details. # # This program is distributed WITHOUT ANY WARRANTY; without even the # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR # PURPOSE. import os import cgi import mtn import sha import sys import web import json import struct import string import rfc822 import config import common import urllib import urlparse import syntax import tarfile import tempfile import datetime import cStringIO from colorsys import hls_to_rgb from fdo import sharedmimeinfo, icontheme import release hq = cgi.escape import heapq import binascii from itertools import izip, chain, repeat import web debug = web.debug # purloined from: http://docs.python.org/lib/itertools-recipes.html def grouper(n, iterable, padvalue=None): "grouper(3, 'abcdefg', 'x') --> ('a','b','c'), ('d','e','f'), ('g','x','x')" return izip(*[chain(iterable, repeat(padvalue, n-1))]*n) # /about.psp -> /about # /branch.psp -> /branch/{branch}/ # /fileinbranch.psp -> /branch/{branch}/file/path (redir) # /headofbranch.psp -> /branch/{branch}/head # /tarofbranch.psp -> /branch/{branch}/tar # /revision.psp -> /revision/{id} # /diff.psp -> /revision/{id}/diff/{id}[/{fname}] # /file.psp -> /revision/{id}/file/{path} # /manifest.psp -> /revision/{id}/browse/{subdir} # /getfile.py -> /revision/{id}/file/{path}&download #??? # /getdiff.py -> /revision/{id}/diff/{id}[/{fname}]&download #??? # /gettar.py -> /revision/{id}/tar # /error.psp -> /error (perhaps not needed) # /help.psp -> /help # /index.psp -> / # /tags.psp -> /tags # /getjson.py -> /json[...] (private) dynamic_join = lambda path: urlparse.urljoin(config.dynamic_uri_path, path) static_join = lambda path: urlparse.urljoin(config.static_uri_path, path) def quicklog(changelog, max_size=None): interesting_line = None for line in changelog: line = line.strip() if line: interesting_line = line break if not interesting_line: return "" if interesting_line.startswith('*'): interesting_line = interesting_line[1:].strip() if max_size and len(interesting_line) > max_size: interesting_line = interesting_line[:max_size] r_wspc = interesting_line.rfind(' ') if r_wspc <> -1: interesting_line = interesting_line[:r_wspc] interesting_line += '..' return interesting_line def timecert(certs): revdate = None for cert in certs: if cert[4] == 'name' and cert[5] == 'date': revdate = common.parse_timecert(cert[7]) return revdate def nbhq(s): return ' '.join([hq(t) for t in s.split(' ')]) def normalise_changelog(changelog): changelog = map(hq, changelog.split('\n')) if changelog and changelog[-1] == '': changelog = changelog[:-1] return changelog class Link(object): def __init__(self, description=None, link_type=None, **kwargs): self.absolute_uri = None self.relative_uri = None self.description = description self.json_args = None def uri(self): return dynamic_join(self.relative_uri) def html(self, override_description=None, force_nbsp=False): if override_description: if force_nbsp: d = nbhq(override_description) else: d = hq(override_description) else: d = self.description if self.relative_uri: uri = dynamic_join(self.relative_uri) elif self.absolute_uri: uri = self.absolute_uri else: return self.description rv = '<a href="%s">%s</a>' % (uri, d) if self.json_args != None: enc_args = binascii.hexlify(json.write(self.json_args)) rv = '<span id="js_%s" class="%s">' % (hq(enc_args), hq(str(self.__class__).split('.')[-1])) + rv + '</span>' return rv class AuthorLink(Link): def __init__(self, author, **kwargs): Link.__init__(*(self, ), **kwargs) name, email = rfc822.parseaddr(author) self.description = author if email: self.absolute_uri = "mailto:%s" % urllib.quote(email) if name: self.description = hq(name) class RevisionLink(Link): def __init__(self, revision, **kwargs): link_type = kwargs.get("link_type") if link_type == "browse": subpage = "browse" else: subpage = "info" Link.__init__(*(self, ), **kwargs) self.json_args = [str(revision)] self.relative_uri = 'revision/%s/%s' % (subpage, revision) self.description = revision.abbrev() class TagLink(Link): def __init__(self, tag, **kwargs): Link.__init__(*(self, ), **kwargs) self.relative_uri = 'revision/info/%s' % (tag.revision) self.description = tag.name class BranchLink(Link): def __init__(self, branch, **kwargs): Link.__init__(*(self, ), **kwargs) self.json_args = [branch.name] from_change, to_change = kwargs.get('from_change'), kwargs.get('to_change') if from_change and to_change: self.relative_uri = 'branch/changes/%s/from/%d/to/%d' % (urllib.quote(branch.name, safe = ''), from_change, to_change) else: self.relative_uri = 'branch/changes/' + urllib.quote(branch.name, safe = '') self.description = hq(branch.name) class DiffLink(Link): def __init__(self, diff, **kwargs): Link.__init__(*(self, ), **kwargs) if kwargs.get('link_type', None) == "raw": mode = "rawdiff" else: mode = "diff" self.relative_uri = 'revision/%s/' % (mode) + diff.from_rev + '/with/' + diff.to_rev if isinstance(diff.fname, list): # TODO: figure out a linking scheme for diffs of multiple files. if len(diff.fname) > 0: fname = diff.fname[0] else: fname = None else: fname = diff.fname if fname is not None: self.relative_uri += '/'+urllib.quote(fname) self.description = "diff" class DirLink(Link): def __init__(self, file, **kwargs): Link.__init__(*(self, ), **kwargs) # handle the root directory if file.name == '/': fn = '' else: fn = file.name self.relative_uri = 'revision/browse/' + file.in_revision + '/' + urllib.quote(fn) self.description = hq(file.name) class FileLink(Link): def __init__(self, file, **kwargs): Link.__init__(*(self, ), **kwargs) if kwargs.has_key('for_download'): access_method = 'downloadfile' elif kwargs.has_key('for_changes'): access_method = 'filechanges' elif kwargs.has_key('for_changes_rss'): access_method = 'filechanges/rss' else: access_method = 'file' self.relative_uri = 'revision/' + access_method + '/' + file.in_revision + '/' + urllib.quote(file.name) self.description = hq(file.name) class Diff(object): def __init__(self, from_rev, to_rev, fname=None): self.obj_type = 'diff' self.fname = fname self.from_rev = from_rev self.to_rev = to_rev def prettify(s): return ' '.join([hq(x[0].upper() + x[1:]) for x in s.replace("_", "").split(" ")]) def certs_for_template(cert_gen): for cert in cert_gen: if cert[0] == 'key' and len(cert) != 10: raise Exception("Not a correctly formatted certificate: %s" % cert) if cert[3] != 'ok': raise Exception("Certificate failed check.") key = cert[1] name = cert[5] value = cert[7] if name == "branch": value = link(mtn.Branch(value)).html() else: value = '<br />'.join(map(hq, value.split('\n'))) yield { 'key' : key, 'name' : prettify(name), 'value' : value } def revisions_for_template(revision, rev_gen): old_revisions = [] stanzas = [] grouping = None for stanza in rev_gen: stanza_type = stanza[0] description, value = prettify(stanza_type), None if grouping == None: grouping = description if description != grouping: if len(stanzas) > 0: yield grouping, stanzas grouping, stanzas = description, [] if stanza_type == "format_version" or \ stanza_type == "new_manifest": continue elif stanza_type == "patch": fname, from_id, to_id = stanza[1], stanza[3], stanza[5] # if from_id is null, this is a new file # since we're showing that information under "Add", so # skip it here if not from_id: continue diff_links = ','.join([link(Diff(old_revision, revision, fname)).html() for old_revision in old_revisions]) value = "Patch file %s (%s)" % (link(mtn.File(fname, revision)).html(), diff_links) elif stanza_type == "old_revision": old_revision = mtn.Revision(stanza[1]) if old_revision.is_empty: value = "This revision is has no ancestor." else: old_revisions.append(old_revision) value = "Old revision is: %s (%s)" % (link(old_revision).html(), link(Diff(old_revision, revision)).html()) elif stanza_type == "add_file": fname = stanza[1] value = "Add file: %s" % (link(mtn.File(fname, revision)).html()) elif stanza_type == "add_dir": dname = stanza[1] value = "Add directory: %s" % (hq(dname)) elif stanza_type == "delete": fname = stanza[1] value = "Delete: %s" % (hq(fname)) elif stanza_type == "set": fname, attr, value = stanza[1], stanza[3], stanza[5] value = "Set attribute '%s' to '%s' upon %s" % (hq(attr), hq(value), link(mtn.File(fname, revision)).html()) elif stanza_type == "rename": oldname, newname = stanza[1], stanza[3] value = "Rename %s to %s" % (hq(oldname), link(mtn.File(newname, revision)).html()) else: value = "(this stanza type is not explicitly rendered; please report this.)\n%s" % hq(str(stanza)) if description != None: stanzas.append(value) if len(stanzas) > 0: yield grouping, stanzas type_to_link_class = { 'author' : AuthorLink, 'branch' : BranchLink, 'diff' : DiffLink, 'dir' : DirLink, 'file' : FileLink, 'revision' : RevisionLink, 'tag' : TagLink, } def link(obj, link_type=None, **kwargs): link_class = type_to_link_class.get(obj.obj_type) if not link_class: raise LinkException("Unable to link to objects of type: '%s'" % (obj.obj_type)) # ugh if link_type: kwargs['link_type'] = link_type return link_class(obj, **kwargs) class ComparisonRev: def __init__(self, ops, revision): self.revision = revision self.certs = list(ops.certs(self.revision)) self.date = None for cert in self.certs: if cert[4] == 'name' and cert[5] == 'date': self.date = common.parse_timecert(cert[7]) def __repr__(self): return "ComparisonRev <%s>" % (repr(self.revision)) def __cmp__(self, other): # irritating edge-case, heapq compares us to empty string if # there's only one thing in the list if not other: return 1 return cmp(other.date, self.date) class Renderer(object): def __init__(self): # any templates that can be inherited from, should be added to the list here self.templates = [ ('base.html', 'base'), ('revision.html', 'revision'), ('branch.html', 'branch'), ('revisionfile.html', 'revisionfile'), ('revisionfileview.html', 'revisionfileview') ] self._templates_loaded = False # these variables will be available to any template self.terms = { 'dynamic_uri_path' : config.dynamic_uri_path, 'dynamic_join' : dynamic_join, 'urllib_quote' : urllib.quote, 'static_uri_path' : config.static_uri_path, 'static_join' : static_join, 'link' : link, 'version' : release.version, } def load_templates(self): if self._templates_loaded: return for template, mod_name in self.templates: web.render(template, None, True, mod_name) self._templates_loaded = True def render(self, template, **kwargs): self.load_templates() terms = self.terms.copy() terms.update(kwargs) web.render(template, terms) class OperationsFactory(object): def __init__ (self): # has the user specified a dbfiles hash? if so, use it self.ops_instances = {} self.default = None if hasattr (config, "dbfiles"): for name in config.dbfiles: self.ops_instances[name] = mtn.Operations([config.monotone, config.dbfiles[name]]) if hasattr (config, "defaultdb"): self.default = config.defaultdb else: self.ops_instances["legacy"] = mtn.Operations([config.monotone, config.dbfile]) self.default = "legacy" def get_ops(self, name): ops = None if name is None: ops = self.ops_instances.get (self.default, None) else: name = name.rstrip('/') ops = self.ops_instances.get (name, None) # technically it'd be better to do this before serving the # request, however this is about the only per-request # spot that runs for every handler.. if not ops is None: ops.per_request() return ops # # TODO: try and wrap these globals up in a single global object # renderer = Renderer() op_fact = OperationsFactory() ##ops = mtn.Operations([config.monotone, config.dbfile]) mimehelp = sharedmimeinfo.LookupHelper(getattr(config, "mime_map", None)) if config.icon_theme: try: mimeicon = icontheme.MimeIcon(icontheme.IconTheme(config.icon_theme), config.icon_size) except Exception, e: print>>sys.stderr, "\nFailed to load icon theme: perhaps you want to set config.icon_theme = None\n" raise e else: mimeicon = None from mk2 import MarkovChain class BranchDivisions(object): def __init__ (self): self.divisions = None def calculate_divisions (self, branches): if self.divisions != None: return chain = MarkovChain (2, join_token='.', cutoff_func=MarkovChain.log_chunkable) for branch in branches: chain.update (branch.name.split ('.')) chain.upchunk () divisions = set () for branch in branches: for chunk in chain.upchunked: idx = branch.name.find (chunk) if idx != -1: divisions.add (branch.name[idx:idx+len(chunk)]) self.divisions = list(divisions) self.divisions.sort () divisions = BranchDivisions () class Index(object): def GET(self, ops): branches = list(ops.branches ()) divisions.calculate_divisions (branches) def division_iter(): bitter = iter(branches) divs = divisions.divisions n_divs = len(divs) in_divs = {} look_for = 0 def new_div (n): did = look_for in_divs[n] = did return "d", did, mtn.Branch(n), len(in_divs.keys ()) * 10 def end_div (n): did = in_divs.pop (n) return "e", did, mtn.Branch(n), len(in_divs.keys ()) * 10 def branch_line (b): return "b", 0, branch, 0 for branch in bitter: for div in in_divs.keys(): # we alter it in the loop, copy.. if branch.name.find (div) != 0: yield end_div (div) if look_for < n_divs: if cmp(branch, divs[look_for]) > 0: look_for += 1 if branch.name.find (divs[look_for]) == 0: yield new_div (divs[look_for]) look_for += 1 yield branch_line (branch) # any stragglers need to be closed for div in in_divs.keys(): yield end_div (div) renderer.render('index.html', page_title="Branches", branches=division_iter()) class About(object): def GET(self): renderer.render('about.html', page_title="About") class Tags(object): def GET(self, ops, restrict_branch=None): # otherwise we couldn't use automate again.. tags = list(ops.tags()) if restrict_branch != None: restrict_branch = mtn.Branch(restrict_branch) def tag_in(tag): for branch in tag.branches: if branch.name == restrict_branch.name: return tag return None tags = filter(tag_in, tags) template_file = "branchtags.html" else: template_file = "tags.html" tags.sort(lambda t1, t2: cmp(t1.name, t2.name)) def revision_ago(rev): rv = "" for cert in ops.certs(rev): if cert[4] == 'name' and cert[5] == 'date': revdate = common.parse_timecert(cert[7]) rv = common.ago(revdate) return rv renderer.render(template_file, page_title="Tags", tags=tags, revision_ago=revision_ago, branch=restrict_branch) class Help(object): def GET(self): renderer.render('help.html', page_title="Help") class Changes(object): def __get_last_changes(self, ops, start_from, parent_func, selection_func, n): """returns at least n revisions that are parents of the revisions in start_from, ordered by time (descending). selection_func is called for each revision, and that revision is only included in the result if the function returns True.""" # revised algorithm in colaboration with Matthias Radestock # # use a heapq to keep a list of interesting revisions # pop from the heapq; get the most recent interesting revision # insert the parents of this revision into the heap # .. repeat until len(heap) >= to_count if not start_from: raise Exception("get_last_changes() unable to find somewhere to start.") last_result = None in_result = set() result = [] revq = [] for rev in start_from: heapq.heappush(revq, ComparisonRev(ops, rev)) while len(result) < n: # print >>sys.stderr, "start_revq state:", map(lambda x: (x.revision, x.date), revq) # update based on the last result we output if last_result != None: parents = filter(None, parent_func(last_result.revision)) for parent_rev in parents: if parent_rev == None: continue heapq.heappush(revq, ComparisonRev(ops, parent_rev)) # try and find something we haven't already output in the heap last_result = None while revq: candidate = heapq.heappop(revq) if not (candidate.revision in in_result) and selection_func(candidate.revision): last_result = candidate break if last_result == None: break # follow the newest edge in_result.add(last_result.revision) result.append(last_result) rv = map (lambda x: (x.revision, x.certs), result), revq return rv def on_our_branch(self, ops, branch, revision): rv = False for cert in ops.certs(revision): if cert[4] == 'name' and cert[5] == 'branch': if cert[7] == branch.name: rv = True return rv def for_template(self, ops, revs, pathinfo=None, constrain_diff_to=None): rv = [] for rev, certs in revs: rev_branch = "" revision, diffs, ago, author, changelog, shortlog, when = mtn.Revision(rev), [], "", mtn.Author(""), "", "", "" for cert in certs: if cert[4] != 'name': continue if cert[5] == "branch": rev_branch = cert[7] elif cert[5] == 'date': revdate = common.parse_timecert(cert[7]) ago = common.ago(revdate) when = revdate.strftime('%a, %d %b %Y %H:%M:%S GMT') elif cert[5] == 'author': author = mtn.Author(cert[7]) elif cert[5] == 'changelog': changelog = normalise_changelog(cert[7]) # NB: this HTML escapes! shortlog = quicklog(changelog) # so this is also HTML escaped. if pathinfo != None: filename = pathinfo.get(rev) else: filename = None if constrain_diff_to: diff_to_revision = mtn.Revision(constrain_diff_to) else: diff_to_revision = revision for stanza in ops.get_revision(rev): if stanza and stanza[0] == "old_revision": diffs.append(Diff(mtn.Revision(stanza[1]), diff_to_revision, filename)) if diffs: if constrain_diff_to: diffs = [ diffs[0] ] diffs = '| ' + ', '.join([link(d).html('diff') for d in diffs]) else: diffs = '' rv.append((revision, diffs, ago, mtn.Author(author), '<br />\n'.join(changelog), shortlog, when, mtn.File(filename, rev))) return rv def determine_bounds(self, from_change, to_change): per_page = 10 max_per_page = 100 if from_change: from_change = int(from_change) else: from_change = 0 if to_change: to_change = int(to_change) else: to_change = per_page if to_change - from_change > max_per_page: to_change = from_change + max_per_page next_from = to_change next_to = to_change + per_page previous_from = from_change - per_page previous_to = previous_from + per_page return (from_change, to_change, next_from, next_to, previous_from, previous_to) def branch_get_last_changes(self, ops, branch, from_change, to_change): heads = [t for t in ops.heads(branch.name)] if not heads: return web.notfound() changed, new_starting_point = self.__get_last_changes(ops, heads, lambda r: ops.parents(r), lambda r: self.on_our_branch(ops, branch, r), to_change) return changed, new_starting_point def Branch_GET(self, ops, branch, from_change, to_change, template_name): branch = mtn.Branch(branch) from_change, to_change, next_from, next_to, previous_from, previous_to = self.determine_bounds(from_change, to_change) changed, new_starting_point = self.branch_get_last_changes(ops, branch, from_change, to_change) changed = changed[from_change:to_change] if len(changed) != to_change - from_change: next_from, next_to = None, None if from_change <= 0: previous_from, previous_to = None, None renderer.render(template_name, page_title="Branch %s" % branch.name, branch=branch, from_change=from_change, to_change=to_change, previous_from=previous_from, previous_to=previous_to, next_from=next_from, next_to=next_to, display_revs=self.for_template(ops, changed)) def file_get_last_changes(self, ops, from_change, to_change, revision, path): def content_changed_fn(start_revision, start_path, in_revision, pathinfo): uniq = set() parents = list(ops.parents(in_revision)) for parent in parents: stanza = list(ops.get_corresponding_path(start_revision, start_path, parent)) # file does not exist in this revision; skip! if not stanza: continue # follow the white rabbit current_path = stanza[0][1] stanza = list(ops.get_content_changed(parent, current_path)) to_add = stanza[0][1] uniq.add(to_add) pathinfo[to_add] = current_path return list(uniq) pathinfo = {} # not just the starting revision! we might not have changed 'path' in the starting rev.. start_at = content_changed_fn(revision, path, revision, pathinfo) changed, new_starting_point = self.__get_last_changes(ops, start_at, lambda r: content_changed_fn(revision, path, r, pathinfo), lambda r: True, to_change) return changed, new_starting_point, pathinfo def File_GET(self, ops, from_change, to_change, revision, path, template_name): from_change, to_change, next_from, next_to, previous_from, previous_to = self.determine_bounds(from_change, to_change) changed, new_starting_point, pathinfo = self.file_get_last_changes(ops, from_change, to_change, revision, path) changed = changed[from_change:to_change] if len(changed) != to_change - from_change: next_from, next_to = None, None if from_change <= 0: previous_from, previous_to = None, None renderer.render(template_name, page_title="Changes to '%s' (from %s)" % (cgi.escape(path), revision.abbrev()), filename=mtn.File(path, revision), revision=revision, from_change=from_change, to_change=to_change, previous_from=previous_from, previous_to=previous_to, next_from=next_from, next_to=next_to, display_revs=self.for_template(ops, changed, pathinfo=pathinfo, constrain_diff_to=revision)) class HTMLBranchChanges(Changes): def GET(self, ops, branch, from_change, to_change): Changes.Branch_GET(self, ops, branch, from_change, to_change, "branchchanges.html") class RSSBranchChanges(Changes): def GET(self, ops, branch, from_change, to_change): Changes.Branch_GET(self, ops, branch, from_change, to_change, "branchchangesrss.html") class RevisionPage(object): def get_fileid(self, ops, revision, filename): rv = None for stanza in ops.get_manifest_of(revision): if stanza[0] != 'file': continue if stanza[1] == filename: rv = stanza[3] return rv def exists(self, ops, revision): try: certs = [t for t in ops.certs(revision)] return True except mtn.MonotoneException: return False def branches_for_rev(self, ops, revisions_val): rv = [] for stanza in ops.certs(revisions_val): if stanza[4] == 'name' and stanza[5] == 'branch': rv.append(stanza[7]) return rv class RevisionFileChanges(Changes, RevisionPage): def GET(self, ops, from_change, to_change, revision, path): revision = mtn.Revision(revision) if not self.exists(ops, revision): return web.notfound() Changes.File_GET(self, ops, from_change, to_change, revision, path, "revisionfilechanges.html") class RevisionFileChangesRSS(Changes, RevisionPage): def GET(self, ops, from_change, to_change, revision, path): revision = mtn.Revision(revision) if not self.exists(ops, revision): return web.notfound() Changes.File_GET(self, ops, from_change, to_change, revision, path, "revisionfilechangesrss.html") class RevisionInfo(RevisionPage): def GET(self, ops, revision): revision = mtn.Revision(revision) if not self.exists(ops, revision): return web.notfound() certs = ops.certs(revision) revisions = ops.get_revision(revision) output_png, output_imagemap = ancestry_graph(ops, revision) if os.access(output_imagemap, os.R_OK): imagemap = open(output_imagemap).read().replace('\\n', ' by ') imageuri = dynamic_join('revision/graph/' + revision) else: imagemap = imageuri = None renderer.render('revisioninfo.html', page_title="Revision %s" % revision.abbrev(), revision=revision, certs=certs_for_template(certs), imagemap=imagemap, imageuri=imageuri, revisions=revisions_for_template(revision, revisions)) class RevisionDiff(RevisionPage): def GET(self, ops, revision_from, revision_to, filename=None): revision_from = mtn.Revision(revision_from) revision_to = mtn.Revision(revision_to) if not self.exists(ops, revision_from): return web.notfound() if not self.exists(ops, revision_to): return web.notfound() if filename != None: files = [filename] else: files = [] diff = ops.diff(revision_from, revision_to, files) diff_obj = Diff(revision_from, revision_to, files) renderer.render('revisiondiff.html', page_title="Diff from %s to %s" % (revision_from.abbrev(), revision_to.abbrev()), revision=revision_from, revision_from=revision_from, revision_to=revision_to, diff=syntax.highlight(diff, 'diff'), diff_obj=diff_obj, files=files) class RevisionRawDiff(RevisionPage): def GET(self, ops, revision_from, revision_to, filename=None): revision_from = mtn.Revision(revision_from) revision_to = mtn.Revision(revision_to) if not self.exists(ops, revision_from): return web.notfound() if not self.exists(ops, revision_to): return web.notfound() if filename != None: files = [filename] else: files = [] diff = ops.diff(revision_from, revision_to, files) web.header('Content-Type', 'text/x-diff') for line in diff: sys.stdout.write (line) sys.stdout.flush() class RevisionFile(RevisionPage): def GET(self, ops, revision, filename): revision = mtn.Revision(revision) if not self.exists(ops, revision): return web.notfound() language = filename.rsplit('.', 1)[-1] fileid = RevisionPage.get_fileid(self, ops, revision, filename) if not fileid: return web.notfound() contents = ops.get_file(fileid) mimetype = mimehelp.lookup(filename, '') mime_to_template = { 'image/jpeg' : 'revisionfileimg.html', 'image/png' : 'revisionfileimg.html', 'image/gif' : 'revisionfileimg.html', 'image/svg+xml' : 'revisionfileobj.html', 'application/pdf' : 'revisionfileobj.html', 'application/x-python' : 'revisionfiletxt.html', 'application/x-perl' : 'revisionfiletxt.html', } template = mime_to_template.get(mimetype, None) if not template: if mimetype.startswith('text/'): template = 'revisionfiletxt.html' else: template = 'revisionfilebin.html' renderer.render(template, filename=mtn.File(filename, revision), page_title="File %s in revision %s" % (filename, revision.abbrev()), revision=revision, mimetype=mimetype, contents=syntax.highlight(contents, language)) class RevisionDownloadFile(RevisionPage): def GET(self, ops, revision, filename): web.header('Content-Disposition', 'attachment; filename=%s' % filename) revision = mtn.Revision(revision) if not self.exists(ops, revision): return web.notfound() fileid = RevisionPage.get_fileid(self, ops, revision, filename) if not fileid: return web.notfound() for idx, data in enumerate(ops.get_file(fileid)): if idx == 0: mimetype = mimehelp.lookup(filename, data) web.header('Content-Type', mimetype) sys.stdout.write(data) sys.stdout.flush() class RevisionTar(RevisionPage): def GET(self, ops, revision): # we'll output in the USTAR tar format; documentation taken from: # http://en.wikipedia.org/wiki/Tar_%28file_format%29 revision = mtn.Revision(revision) if not self.exists(ops, revision): return web.notfound() filename = "%s.tar" % revision web.header('Content-Disposition', 'attachment; filename=%s' % filename) web.header('Content-Type', 'application/x-tar') manifest = [stanza for stanza in ops.get_manifest_of(revision)] # for now; we might want to come up with something more interesting; # maybe the branch name (but there might be multiple branches?) basedirname = revision tarobj = tarfile.open(name=filename, mode="w", fileobj=sys.stdout) dir_mode, file_mode = "0700", "0600" certs = {} for stanza in manifest: stanza_type = stanza[0] if stanza_type != 'file': continue filename, fileid = stanza[1], stanza[3] filecontents = cStringIO.StringIO() filesize = 0 for data in ops.get_file(fileid): filesize += len(data) filecontents.write(data) ti = tarfile.TarInfo() ti.name = os.path.join(revision, filename) ti.mode, ti.type = 00600, tarfile.REGTYPE ti.uid = ti.gid = 0 # determine the most recent of the content marks content_marks = [t[1] for t in ops.get_content_changed(revision, filename)] if len(content_marks) > 0: # just pick one to make this faster content_mark = content_marks[0] since_epoch = timecert