Below is the file 'database.cc' from this revision. You can also download the file.

// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*-
// copyright (C) 2002, 2003 graydon hoare <graydon@pobox.com>
// all rights reserved.
// licensed to the public under the terms of the GNU GPL (>= 2)
// see the file COPYING for details

#include <fstream>
#include <iterator>
#include <list>
#include <set>
#include <sstream>
#include <vector>

#include <stdarg.h>
#include <string.h>

#include <boost/shared_ptr.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/operations.hpp>

#include <sqlite3.h>

#include "app_state.hh"
#include "cert.hh"
#include "cleanup.hh"
#include "constants.hh"
#include "database.hh"
#include "keys.hh"
#include "sanity.hh"
#include "schema_migration.hh"
#include "transforms.hh"
#include "ui.hh"
#include "vocab.hh"
#include "xdelta.hh"
#include "epoch.hh"

// defined in schema.sql, converted to header:
#include "schema.h"

// defined in views.sql, converted to header:
#include "views.h"

// this file defines a public, typed interface to the database.
// the database class encapsulates all knowledge about sqlite,
// the schema, and all SQL statements used to access the schema.
//
// see file schema.sql for the text of the schema.

using boost::shared_ptr;
using boost::lexical_cast;
using namespace std;

int const one_row = 1;
int const one_col = 1;
int const any_rows = -1;
int const any_cols = -1;

extern "C" {
// some wrappers to ease migration
  int sqlite3_exec_printf(sqlite3*,const char *sqlFormat,sqlite3_callback,
                          void *,char **errmsg,...);
  int sqlite3_exec_vprintf(sqlite3*,const char *sqlFormat,sqlite3_callback,
                           void *,char **errmsg,va_list ap);
  int sqlite3_get_table_vprintf(sqlite3*,const char *sqlFormat,char ***resultp,
                                int *nrow,int *ncolumn,char **errmsg,va_list ap);
  const char *sqlite3_value_text_s(sqlite3_value *v);
}

int sqlite3_exec_printf(sqlite3 * db,
                        char const * sqlFormat,
                        sqlite3_callback cb,
                        void * user_data,
                        char ** errmsg,
                        ...)
{
  va_list ap;
  va_start(ap, errmsg);
  int result = sqlite3_exec_vprintf(db, sqlFormat, cb,
                                    user_data, errmsg, ap);
  va_end(ap);
  return result;
}

int sqlite3_exec_vprintf(sqlite3 * db,
                         char const * sqlFormat,
                         sqlite3_callback cb,
                         void * user_data,
                         char ** errmsg,
                         va_list ap)
{
  char * formatted = sqlite3_vmprintf(sqlFormat, ap);
  int result = sqlite3_exec(db, formatted, cb,
                            user_data, errmsg);
  sqlite3_free(formatted);
  return result;
}

int sqlite3_get_table_vprintf(sqlite3 * db,
                              char const * sqlFormat,
                              char *** resultp,
                              int * nrow,
                              int * ncolumn,
                              char ** errmsg,
                              va_list ap)
{
  char * formatted = sqlite3_vmprintf(sqlFormat, ap);
  int result = sqlite3_get_table(db, formatted, resultp,
                                 nrow, ncolumn, errmsg);
  sqlite3_free(formatted);
  return result;
}

database::database(fs::path const & fn) :
  filename(fn),
  // nb. update this if you change the schema. unfortunately we are not
  // using self-digesting schemas due to comment irregularities and
  // non-alphabetic ordering of tables in sql source files. we could create
  // a temporary db, write our intended schema into it, and read it back,
  // but this seems like it would be too rude. possibly revisit this issue.
  schema("e372b508bea9b991816d1c74680f7ae10d2a6d94"),
  __sql(NULL),
  transaction_level(0)
{}

void
database::check_schema()
{
  string db_schema_id;
  calculate_schema_id (__sql, db_schema_id);
  N (schema == db_schema_id,
     F("database schemas do not match: "
       "wanted %s, got %s. try migrating database")
     % schema % db_schema_id);
}

// sqlite3_value_text gives a const unsigned char * but most of the time
// we need a signed char
const char *
sqlite3_value_text_s(sqlite3_value *v)
{
  return (const char *)(sqlite3_value_text(v));
}

static void
sqlite3_unbase64_fn(sqlite3_context *f, int nargs, sqlite3_value ** args)
{
  if (nargs != 1)
    {
      sqlite3_result_error(f, "need exactly 1 arg to unbase64()", -1);
      return;
    }
  data decoded;
  decode_base64(base64<data>(string(sqlite3_value_text_s(args[0]))), decoded);
  sqlite3_result_blob(f, decoded().c_str(), decoded().size(), SQLITE_TRANSIENT);
}

static void
sqlite3_unpack_fn(sqlite3_context *f, int nargs, sqlite3_value ** args)
{
  if (nargs != 1)
    {
      sqlite3_result_error(f, "need exactly 1 arg to unpack()", -1);
      return;
    }
  data unpacked;
  unpack(base64< gzip<data> >(string(sqlite3_value_text_s(args[0]))), unpacked);
  sqlite3_result_blob(f, unpacked().c_str(), unpacked().size(), SQLITE_TRANSIENT);
}

void
database::set_app(app_state * app)
{
  __app = app;
}

static void
check_sqlite_format_version(fs::path const & filename)
{
  if (fs::exists(filename))
    {
      N(!fs::is_directory(filename),
        F("database %s is a directory\n") % filename.string());

      // sqlite 3 files begin with this constant string
      // (version 2 files begin with a different one)
      std::string version_string("SQLite format 3");

      std::ifstream file(filename.string().c_str());
      N(file, F("unable to probe database version in file %s") % filename.string());

      for (std::string::const_iterator i = version_string.begin();
           i != version_string.end(); ++i)
        {
          char c;
          file.get(c);
          N(c == *i, F("database %s is not an sqlite version 3 file, "
                       "try dump and reload") % filename.string());
        }
    }
}


struct sqlite3 *
database::sql(bool init)
{
  if (! __sql)
    {
      N(!filename.empty(), F("no database specified"));

      if (! init)
        {
          N(fs::exists(filename),
            F("database %s does not exist") % filename.string());
          N(!fs::is_directory(filename),
            F("database %s is a directory") % filename.string());
        }

      check_sqlite_format_version(filename);
      int error;
      error = sqlite3_open(filename.string().c_str(), &__sql);
      if (error)
        throw oops(string("could not open database: ") + filename.string() +
                   (": " + string(sqlite3_errmsg(__sql))));
      if (init)
        execute(schema_constant);

      check_schema();
      install_functions(__app);
      install_views();
    }
  return __sql;
}

void
database::initialize()
{
  if (__sql)
    throw oops("cannot initialize database while it is open");

  N(!fs::exists(filename),
    F("could not initialize database: %s: already exists")
    % filename.string());

  fs::path journal = mkpath(filename.string() + "-journal");
  N(!fs::exists(journal),
    F("existing (possibly stale) journal file '%s' "
      "has same stem as new database '%s'")
    % journal.string() % filename.string());

  sqlite3 *s = sql(true);
  I(s != NULL);
}


struct
dump_request
{
  dump_request() {};
  struct sqlite3 *sql;
  string table_name;
  ostream *out;
};

static int
dump_row_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(vals != NULL);
  I(dump->out != NULL);
  *(dump->out) << F("INSERT INTO %s VALUES(") % dump->table_name;
  for (int i = 0; i < n; ++i)
    {
      if (i != 0)
        *(dump->out) << ',';

      if (vals[i] == NULL)
        *(dump->out) << "NULL";
      else
        {
          *(dump->out) << "'";
          for (char *cp = vals[i]; *cp; ++cp)
            {
              if (*cp == '\'')
                *(dump->out) << "''";
              else
                *(dump->out) << *cp;
            }
          *(dump->out) << "'";
        }
    }
  *(dump->out) << ");\n";
  return 0;
}

static int
dump_table_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(dump->sql != NULL);
  I(vals != NULL);
  I(vals[0] != NULL);
  I(vals[1] != NULL);
  I(vals[2] != NULL);
  I(n == 3);
  if (string(vals[1]) == "table")
    {
      *(dump->out) << vals[2] << ";\n";
      dump->table_name = string(vals[0]);
      sqlite3_exec_printf(dump->sql, "SELECT * FROM '%q'",
                         dump_row_cb, data, NULL, vals[0]);
    }
  return 0;
}

void
database::dump(ostream & out)
{
  dump_request req;
  req.out = &out;
  req.sql = sql();
  out << "BEGIN TRANSACTION;\n";
  int res = sqlite3_exec(req.sql,
                        "SELECT name, type, sql FROM sqlite_master "
                        "WHERE type='table' AND sql NOT NULL "
                        "ORDER BY substr(type,2,1), name",
                        dump_table_cb, &req, NULL);
  I(res == SQLITE_OK);
  out << "COMMIT;\n";
}

void
database::load(istream & in)
{
  char buf[constants::bufsz];
  string tmp;

  N(filename.string() != "",
    F("need database name"));
  N(!fs::exists(filename),
    F("cannot create %s; it already exists\n") % filename.string());
  int error = sqlite3_open(filename.string().c_str(), &__sql);
  if (error)
    throw oops(string("could not open database: ") + filename.string() +
               (string(sqlite3_errmsg(__sql))));

  while(in)
    {
      in.read(buf, constants::bufsz);
      tmp.append(buf, in.gcount());
    }

  execute(tmp.c_str());
}


void
database::debug(string const & sql, ostream & out)
{
  results res;
  // "%s" construction prevents interpretation of %-signs in the query string
  // as formatting commands.
  fetch(res, any_cols, any_rows, "%s", sql.c_str());
  out << "'" << sql << "' -> " << res.size() << " rows\n" << endl;
  for (size_t i = 0; i < res.size(); ++i)
    {
      for (size_t j = 0; j < res[i].size(); ++j)
        {
          if (j != 0)
            out << " | ";
          out << res[i][j];
        }
      out << endl;
    }
}

void
database::info(ostream & out)
{
  string id;
  calculate_schema_id(sql(), id);
  unsigned long space = 0, tmp;
  out << "schema version    : " << id << endl;

  out << "counts:" << endl;
  out << "  full manifests  : " << count("manifests") << endl;
  out << "  manifest deltas : " << count("manifest_deltas") << endl;
  out << "  full files      : " << count("files") << endl;
  out << "  file deltas     : " << count("file_deltas") << endl;
  out << "  revisions       : " << count("revisions") << endl;
  out << "  ancestry edges  : " << count("revision_ancestry") << endl;
  out << "  certs           : " << count("revision_certs") << endl;

  out << "bytes:" << endl;
  // FIXME: surely there is a less lame way to do this, that doesn't require
  // updating every time the schema changes?
  tmp = space_usage("manifests", "id || data");
  space += tmp;
  out << "  full manifests  : " << tmp << endl;

  tmp = space_usage("manifest_deltas", "id || base || delta");
  space += tmp;
  out << "  manifest deltas : " << tmp << endl;

  tmp = space_usage("files", "id || data");
  space += tmp;
  out << "  full files      : " << tmp << endl;

  tmp = space_usage("file_deltas", "id || base || delta");
  space += tmp;
  out << "  file deltas     : " << tmp << endl;

  tmp = space_usage("revisions", "id || data");
  space += tmp;
  out << "  revisions       : " << tmp << endl;

  tmp = space_usage("revision_ancestry", "parent || child");
  space += tmp;
  out << "  cached ancestry : " << tmp << endl;

  tmp = space_usage("revision_certs", "hash || id || name || value || keypair || signature");
  space += tmp;
  out << "  certs           : " << tmp << endl;

  out << "  total           : " << space << endl;
}

void
database::version(ostream & out)
{
  string id;
  N(filename.string() != "",
    F("need database name"));
  int error = sqlite3_open(filename.string().c_str(), &__sql);
  if (error)
    {
      sqlite3_close(__sql);
      throw oops(string("could not open database: ") + filename.string() +
                 (": " + string(sqlite3_errmsg(__sql))));
    }
  calculate_schema_id(__sql, id);
  sqlite3_close(__sql);
  out << "database schema version: " << id << endl;
}

void
database::migrate()
{
  N(filename.string() != "",
    F("need database name"));
  int error = sqlite3_open(filename.string().c_str(), &__sql);
  if (error)
    {
      sqlite3_close(__sql);
      throw oops(string("could not open database: ") + filename.string() +
                 (": " + string(sqlite3_errmsg(__sql))));
    }
  migrate_monotone_schema(__sql);
  sqlite3_close(__sql);
}

void
database::rehash()
{
  transaction_guard guard(*this);
  ticker mcerts("mcerts", "m", 1);
  ticker pubkeys("pubkeys", "+", 1);
  ticker privkeys("privkeys", "!", 1);

  {
    // rehash all mcerts
    results res;
    vector<cert> certs;
    fetch(res, 5, any_rows,
          "SELECT id, name, value, keypair, signature "
          "FROM manifest_certs");
    results_to_certs(res, certs);
    execute("DELETE FROM manifest_certs");
    for(vector<cert>::const_iterator i = certs.begin(); i != certs.end(); ++i)
      {
        put_cert(*i, "manifest_certs");
        ++mcerts;
      }
  }

  {
    // rehash all pubkeys
    results res;
    fetch(res, 2, any_rows, "SELECT id, keydata FROM public_keys");
    execute("DELETE FROM public_keys");
    for (size_t i = 0; i < res.size(); ++i)
      {
        hexenc<id> tmp;
        key_hash_code(rsa_keypair_id(res[i][0]), base64<rsa_pub_key>(res[i][1]), tmp);
        execute("INSERT INTO public_keys VALUES('%q', '%q', '%q')",
                tmp().c_str(), res[i][0].c_str(), res[i][1].c_str());
        ++pubkeys;
      }
  }

  {
    // rehash all privkeys
    results res;
    fetch(res, 2, any_rows, "SELECT id, keydata FROM private_keys");
    execute("DELETE FROM private_keys");
    for (size_t i = 0; i < res.size(); ++i)
      {
        hexenc<id> tmp;
        key_hash_code(rsa_keypair_id(res[i][0]), base64< arc4<rsa_priv_key> >(res[i][1]), tmp);
        execute("INSERT INTO private_keys VALUES('%q', '%q', '%q')",
                tmp().c_str(), res[i][0].c_str(), res[i][1].c_str());
        ++privkeys;
      }
  }

  guard.commit();
}

void
database::ensure_open()
{
  sqlite3 *s = sql();
  I(s != NULL);
}

database::~database()
{
  if (__sql)
    {
      sqlite3_close(__sql);
      __sql = 0;
    }
}

static void
assert_sqlite3_ok(int res)
{
  switch (res)
    {
    case SQLITE_OK:
      break;

    case SQLITE_ERROR:
      throw oops("SQL error or missing database");
      break;

    case SQLITE_INTERNAL:
      throw oops("An internal logic error in SQLite");
      break;

    case SQLITE_PERM:
      throw oops("Access permission denied");
      break;

    case SQLITE_ABORT:
      throw oops("Callback routine requested an abort");
      break;

    case SQLITE_BUSY:
      throw oops("The database file is locked");
      break;

    case SQLITE_LOCKED:
      throw oops("A table in the database is locked");
      break;

    case SQLITE_NOMEM:
      throw oops("A malloc() failed");
      break;

    case SQLITE_READONLY:
      throw oops("Attempt to write a readonly database");
      break;

    case SQLITE_INTERRUPT:
      throw oops("Operation terminated by sqlite3_interrupt()");
      break;

    case SQLITE_IOERR:
      throw oops("Some kind of disk I/O error occurred");
      break;

    case SQLITE_CORRUPT:
      throw oops("The database disk image is malformed");
      break;

    case SQLITE_NOTFOUND:
      throw oops("(Internal Only) Table or record not found");
      break;

    case SQLITE_FULL:
      throw oops("Insertion failed because database (or filesystem) is full");
      break;

    case SQLITE_CANTOPEN:
      throw oops("Unable to open the database file");
      break;

    case SQLITE_PROTOCOL:
      throw oops("database lock protocol error");
      break;

    case SQLITE_EMPTY:
      throw oops("(Internal Only) database table is empty");
      break;

    case SQLITE_SCHEMA:
      throw oops("The database schema changed");
      break;

    case SQLITE_TOOBIG:
      throw oops("Too much data for one row of a table");
      break;

    case SQLITE_CONSTRAINT:
      throw oops("Abort due to contraint violation");
      break;

    case SQLITE_MISMATCH:
      throw oops("Data type mismatch");
      break;

    case SQLITE_MISUSE:
      throw oops("Library used incorrectly");
      break;

    case SQLITE_NOLFS:
      throw oops("Uses OS features not supported on host");
      break;

    case SQLITE_AUTH:
      throw oops("Authorization denied");
      break;

    default:
      throw oops(string("Unknown DB result code: ") + lexical_cast<string>(res));
      break;
    }
}

void
database::execute(char const * query, ...)
{
  va_list ap;

  va_start(ap, query);

  // log it
  char * formatted = sqlite3_vmprintf(query, ap);
  string qq;

  if (strlen(formatted) > constants::db_log_line_sz)
    {
      qq.assign(formatted, constants::db_log_line_sz);
      qq.append(" ...");
    }
  else
    {
      qq = formatted;
    }
  L(F("db.execute(\"%s\")\n") % qq);

  // do it
  char * errmsg = NULL;
  int res = sqlite3_exec(sql(), formatted, NULL, NULL, &errmsg);
  sqlite3_free(formatted);

  va_end(ap);

  if (errmsg)
    throw oops(string("sqlite exec error ") + errmsg);

  assert_sqlite3_ok(res);

}

void
database::fetch(results & res,
                int const want_cols,
                int const want_rows,
                char const * query, ...)
{
  char ** result = NULL;
  int nrow;
  int ncol;
  char * errmsg = NULL;
  int rescode;

  va_list ap;
  res.clear();
  res.resize(0);
  va_start(ap, query);

  // log it
  char * formatted = sqlite3_vmprintf(query, ap);
  string qq(formatted);
  if (qq.size() > constants::log_line_sz)
    qq = qq.substr(0, constants::log_line_sz) + string(" ...");
  L(F("db.fetch(\"%s\")\n") % qq);
  sqlite3_free(formatted);

  va_end(ap);
  va_start(ap, query);

  // do it
  rescode = sqlite3_get_table_vprintf(sql(), query, &result, &nrow, &ncol, &errmsg, ap);

  va_end(ap);

  cleanup_ptr<char **, void>
    result_guard(result, &sqlite3_free_table);

  string ctx = string("db query [") + string(query) + "]: ";

  if (errmsg)
    throw oops(ctx + string("sqlite error ") + errmsg);
  assert_sqlite3_ok(rescode);

  if (want_cols == 0 && ncol == 0) return;
  if (want_rows == 0 && nrow == 0) return;
  if (want_cols == any_rows && ncol == 0) return;
  if (want_rows == any_rows && nrow == 0) return;

  if (want_cols != any_cols &&
      ncol != want_cols)
    throw oops((F("%s wanted %d columns, got %s")
                % ctx % want_cols % ncol).str());

  if (want_rows != any_rows &&
      nrow != want_rows)
    throw oops((F("%s wanted %d rows, got %s")
                % ctx % want_rows % nrow).str());

  if (!result)
    throw oops(ctx + "null result set");

  for (int i = 0; i < ncol; ++i)
    if (!result[i])
      throw oops(ctx + "null column name");

  for (int row = 0; row < nrow; ++row)
    {
      vector<string> rowvec;
      for (int col = 0; col < ncol; ++col)
        {
          int i = ((1 + row) * ncol) + col;
          if (!result[i])
            throw oops(ctx + "null result value");
          else
            rowvec.push_back(result[i]);
        }
      res.push_back(rowvec);
    }
}

// general application-level logic

void
database::set_filename(fs::path const & file)
{
  if (__sql)
    {
      throw oops("cannot change filename to " + file.string() + " while db is open");
    }
  filename = file;
}

void
database::begin_transaction()
{
  if (transaction_level == 0)
      execute("BEGIN");
  transaction_level++;
}

void
database::commit_transaction()
{
  if (transaction_level == 1)
    execute("COMMIT");
  transaction_level--;
}

void
database::rollback_transaction()
{
  if (transaction_level == 1)
    execute("ROLLBACK");
  transaction_level--;
}


bool
database::exists(hexenc<id> const & ident,
                      string const & table)
{
  results res;
  fetch(res, one_col, any_rows,
        "SELECT id FROM '%q' WHERE id = '%q'",
        table.c_str(), ident().c_str());
  I((res.size() == 1) || (res.size() == 0));
  return res.size() == 1;
}


bool
database::delta_exists(hexenc<id> const & ident,
                       string const & table)
{
  results res;
  fetch(res, one_col, any_rows,
        "SELECT id FROM '%q' WHERE id = '%q'",
        table.c_str(), ident().c_str());
  return res.size() > 0;
}

bool
database::delta_exists(hexenc<id> const & ident,
                       hexenc<id> const & base,
                       string const & table)
{
  results res;
  fetch(res, one_col, any_rows,
        "SELECT id FROM '%q' WHERE id = '%q' AND base = '%q'",
        table.c_str(), ident().c_str(), base().c_str());
  I((res.size() == 1) || (res.size() == 0));
  return res.size() == 1;
}

unsigned long
database::count(string const & table)
{
  results res;
  fetch(res, one_col, one_row,
        "SELECT COUNT(*) FROM '%q'",
        table.c_str());
  return lexical_cast<unsigned long>(res[0][0]);
}

unsigned long
database::space_usage(string const & table, string const & concatenated_columns)
{
  results res;
  fetch(res, one_col, one_row,
        "SELECT SUM(LENGTH(%s)) FROM '%q'",
        concatenated_columns.c_str(), table.c_str());
  return lexical_cast<unsigned long>(res[0][0]);
}

void
database::get_ids(string const & table, set< hexenc<id> > & ids)
{
  results res;

  fetch(res, one_col, any_rows, "SELECT id FROM %q", table.c_str());

  for (size_t i = 0; i < res.size(); ++i)
    {
      ids.insert(hexenc<id>(res[i][0]));
    }
}

void
database::get(hexenc<id> const & ident,
              data & dat,
              string const & table)
{
  results res;
  fetch(res, one_col, one_row,
        "SELECT data FROM '%q' WHERE id = '%q'",
        table.c_str(), ident().c_str());

  // consistency check
  base64<gzip<data> > rdata(res[0][0]);
  data rdata_unpacked;
  unpack(rdata, rdata_unpacked);

  hexenc<id> tid;
  calculate_ident(rdata_unpacked, tid);
  I(tid == ident);

  dat = rdata_unpacked;
}

void
database::get_delta(hexenc<id> const & ident,
                    hexenc<id> const & base,
                    delta & del,
                    string const & table)
{
  I(ident() != "");
  I(base() != "");
  results res;
  fetch(res, one_col, one_row,
        "SELECT delta FROM '%q' WHERE id = '%q' AND base = '%q'",
        table.c_str(), ident().c_str(), base().c_str());

  base64<gzip<delta> > del_packed = res[0][0];
  unpack(del_packed, del);
}

void
database::put(hexenc<id> const & ident,
              data const & dat,
              string const & table)
{
  // consistency check
  I(ident() != "");
  hexenc<id> tid;
  calculate_ident(dat, tid);
  I(tid == ident);

  base64<gzip<data> > dat_packed;
  pack(dat, dat_packed);

  execute("INSERT INTO '%q' VALUES('%q', '%q')",
          table.c_str(), ident().c_str(), dat_packed().c_str());
}
void
database::put_delta(hexenc<id> const & ident,
                    hexenc<id> const & base,
                    delta const & del,
                    string const & table)
{
  // nb: delta schema is (id, base, delta)
  I(ident() != "");
  I(base() != "");

  base64<gzip<delta> > del_packed;
  pack(del, del_packed);

  execute("INSERT INTO '%q' VALUES('%q', '%q', '%q')",
          table.c_str(),
          ident().c_str(), base().c_str(), del_packed().c_str());
}

// static ticker cache_hits("vcache hits", "h", 1);

struct version_cache
{
  size_t capacity;
  size_t use;
  std::map<hexenc<id>, data> cache;

  version_cache() : capacity(constants::db_version_cache_sz), use(0)
  {
    srand(time(NULL));
  }

  void put(hexenc<id> const & ident, data const & dat)
  {
    while (!cache.empty()
           && use + dat().size() > capacity)
      {
        std::string key = (F("%08.8x%08.8x%08.8x%08.8x%08.8x")
                           % rand() % rand() % rand() % rand() % rand()).str();
        std::map<hexenc<id>, data>::const_iterator i;
        i = cache.lower_bound(hexenc<id>(key));
        if (i == cache.end())
          {
            // we can't find a random entry, probably there's only one
            // entry and we missed it. delete first entry instead.
            i = cache.begin();
          }
        I(i != cache.end());
        I(use >= i->second().size());
        L(F("version cache expiring %s\n") % i->first);
        use -= i->second().size();
        cache.erase(i->first);
      }
    cache.insert(std::make_pair(ident, dat));
    use += dat().size();
  }

  bool exists(hexenc<id> const & ident)
  {
    std::map<hexenc<id>, data>::const_iterator i;
    i = cache.find(ident);
    return i != cache.end();
  }

  bool get(hexenc<id> const & ident, data & dat)
  {
    std::map<hexenc<id>, data>::const_iterator i;
    i = cache.find(ident);
    if (i == cache.end())
      return false;
    // ++cache_hits;
    L(F("version cache hit on %s\n") % ident);
    dat = i->second;
    return true;
  }
};

static version_cache vcache;

void
database::get_version(hexenc<id> const & ident,
                      data & dat,
                      string const & data_table,
                      string const & delta_table)
{
  I(ident() != "");

  if (vcache.get(ident, dat))
    {
      return;
    }
  else if (exists(ident, data_table))
    {
     // easy path
     get(ident, dat, data_table);
    }
  else
    {
      // tricky path

      // we start from the file we want to reconstruct and work *forwards*
      // through the database, until we get to a full data object. we then
      // trace back through the list of edges we followed to get to the data
      // object, applying reverse deltas.
      //
      // the effect of this algorithm is breadth-first search, backwards
      // through the storage graph, to discover a forwards shortest path, and
      // then following that shortest path with delta application.
      //
      // we used to do this with the boost graph library, but it invovled
      // loading too much of the storage graph into memory at any moment. this
      // imperative version only loads the descendents of the reconstruction
      // node, so it much cheaper in terms of memory.
      //
      // we also maintain a cycle-detecting set, just to be safe

      L(F("reconstructing %s in %s\n") % ident % delta_table);
      I(delta_exists(ident, delta_table));

      // nb: an edge map goes in the direction of the
      // delta, *not* the direction we discover things in,
      // i.e. each map is of the form [newid] -> [oldid]

      typedef map< hexenc<id>, hexenc<id> > edgemap;
      list< shared_ptr<edgemap> > paths;

      set< hexenc<id> > frontier, cycles;
      frontier.insert(ident);

      bool found_root = false;
      hexenc<id> root("");

      while (! found_root)
        {
          set< hexenc<id> > next_frontier;
          shared_ptr<edgemap> frontier_map(new edgemap());

          I(!frontier.empty());

          for (set< hexenc<id> >::const_iterator i = frontier.begin();
               i != frontier.end(); ++i)
            {
              if (vcache.exists(*i) || exists(*i, data_table))
                {
                  root = *i;
                  found_root = true;
                  break;
                }
              else
                {
                  cycles.insert(*i);
                  results res;
                  fetch(res, one_col, any_rows, "SELECT base from '%q' WHERE id = '%q'",
                        delta_table.c_str(), (*i)().c_str());
                  for (size_t k = 0; k < res.size(); ++k)
                    {
                      hexenc<id> const nxt(res[k][0]);

                      if (cycles.find(nxt) != cycles.end())
                        throw oops("cycle in table '" + delta_table + "', at node "
                                   + (*i)() + " <- " + nxt());

                      next_frontier.insert(nxt);

                      if (frontier_map->find(nxt) == frontier_map->end())
                        {
                          L(F("inserting edge: %s <- %s\n") % (*i) % nxt);
                          frontier_map->insert(make_pair(nxt, *i));
                        }
                      else
                        L(F("skipping merge edge %s <- %s\n") % (*i) % nxt);
                    }
                }
            }
          if (!found_root)
            {
              frontier = next_frontier;
              paths.push_front(frontier_map);
            }
        }

      // path built, now all we need to do is follow it back

      I(found_root);
      I(root() != "");
      data begin;

      if (vcache.exists(root))
        {
          I(vcache.get(root, begin));
        }
      else
        {
          get(root, begin, data_table);
        }

      hexenc<id> curr = root;

      boost::shared_ptr<delta_applicator> app = new_piecewise_applicator();
      app->begin(begin());

      for (list< shared_ptr<edgemap> >::const_iterator p = paths.begin();
           p != paths.end(); ++p)
        {
          shared_ptr<edgemap> i = *p;
          I(i->find(curr) != i->end());
          hexenc<id> const nxt = i->find(curr)->second;

          if (!vcache.exists(curr))
            {
              string tmp;
              app->finish(tmp);
              vcache.put(curr, tmp);
            }


          L(F("following delta %s -> %s\n") % curr % nxt);
          delta del;
          get_delta(nxt, curr, del, delta_table);
          apply_delta (app, del());

          app->next();
          curr = nxt;
        }

      string tmp;
      app->finish(tmp);
      dat = data(tmp);

      hexenc<id> final;
      calculate_ident(dat, final);
      I(final == ident);
    }
  vcache.put(ident, dat);
}


void
database::drop(hexenc<id> const & ident,
               string const & table)
{
  execute("DELETE FROM '%q' WHERE id = '%q'",
          table.c_str(),
          ident().c_str());
}

void
database::put_version(hexenc<id> const & old_id,
                      hexenc<id> const & new_id,
                      delta const & del,
                      string const & data_table,
                      string const & delta_table)
{

  data old_data, new_data;
  delta reverse_delta;

  get_version(old_id, old_data, data_table, delta_table);
  patch(old_data, del, new_data);
  diff(new_data, old_data, reverse_delta);

  transaction_guard guard(*this);
  if (exists(old_id, data_table))
    {
      // descendent of a head version replaces the head, therefore old head
      // must be disposed of
      drop(old_id, data_table);
    }
  put(new_id, new_data, data_table);
  put_delta(old_id, new_id, reverse_delta, delta_table);
  guard.commit();
}

void
database::put_reverse_version(hexenc<id> const & new_id,
                              hexenc<id> const & old_id,
                              delta const & reverse_del,
                              string const & data_table,
                              string const & delta_table)
{
  data old_data, new_data;

  get_version(new_id, new_data, data_table, delta_table