Below is the file 'database.cc' from this revision. You can also download the file.

// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil; c-basic-offset: 2 -*-
// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:
// copyright (C) 2002, 2003 graydon hoare <graydon@pobox.com>
// copyright (C) 2006 vinzenz feenstra <evilissimo@c-plusplus.de>
// all rights reserved.
// licensed to the public under the terms of the GNU GPL (>= 2)
// see the file COPYING for details


#include <algorithm>
#include <deque>
#include <fstream>
#include <iterator>
#include <list>
#include <set>
#include <sstream>
#include <vector>

#include <string.h>

#include <boost/shared_ptr.hpp>
#include <boost/lexical_cast.hpp>

#include <sqlite3.h>

#include "app_state.hh"
#include "cert.hh"
#include "cleanup.hh"
#include "constants.hh"
#include "database.hh"
#include "hash_map.hh"
#include "keys.hh"
#include "sanity.hh"
#include "schema_migration.hh"
#include "transforms.hh"
#include "ui.hh"
#include "vocab.hh"
#include "xdelta.hh"
#include "epoch.hh"

// defined in schema.sql, converted to header:
#include "schema.h"

// defined in views.sql, converted to header:
#include "views.h"
// this file defines a public, typed interface to the database.
// the database class encapsulates all knowledge about sqlite,
// the schema, and all SQL statements used to access the schema.
//
// see file schema.sql for the text of the schema.

using boost::shared_ptr;
using boost::lexical_cast;
using namespace std;

int const one_row = 1;
int const one_col = 1;
int const any_rows = -1;
int const any_cols = -1;

namespace
{
  struct query_param
  {
    enum arg_type { text, blob, integer };
    arg_type type;
    std::string data;
    uint32_t number;
  };

  query_param
  text(std::string const & txt)
  {
    query_param q = {
      query_param::text,
      txt,
      0,
    };
    return q;
  }

  query_param
  blob(std::string const & blb)
  {
    query_param q = {
      query_param::blob,
      blb,
      0,
    };
    return q;
  }

  query_param
  integer(uint32_t const & intg)
  {
    query_param q = {
      query_param::integer,
      "",
      intg,
    };
    return q;
  }

  // track all open databases for close_all_databases() handler
  set<sqlite3*> sql_contexts;
}

struct query
{
  query(std::string const & cmd)
    : sql_cmd(cmd)
  {}

  query & operator %(query_param const & qp)
  {
    args.push_back(qp);
    return *this;
  }

  std::vector<query_param> args;
  std::string sql_cmd;
};

extern "C" {
// some wrappers to ease migration
  const char *sqlite3_value_text_s(sqlite3_value *v);
  const char *sqlite3_column_text_s(sqlite3_stmt*, int col);
}

database::database(system_path const & fn) :
  filename(fn),
  // nb. update this if you change the schema. unfortunately we are not
  // using self-digesting schemas due to comment irregularities and
  // non-alphabetic ordering of tables in sql source files. we could create
  // a temporary db, write our intended schema into it, and read it back,
  // but this seems like it would be too rude. possibly revisit this issue.
  schema("b0987b874c2d348e9720ec11cf9618509b002618"),
  __sql(NULL),
  transaction_level(0)
{}

bool
database::is_dbfile(any_path const & file)
{
  system_path fn(file);// why is this needed?
  bool same = (filename.as_internal() == fn.as_internal());
  if (same)
    L(FL("'%s' is the database file") % file);
  return same;
}

void
database::check_schema()
{
  string db_schema_id;
  calculate_schema_id (__sql, db_schema_id);
  N (schema == db_schema_id,
     F("layout of database %s doesn't match this version of monotone\n"
       "wanted schema %s, got %s\n"
       "try 'monotone db migrate' to upgrade\n"
       "(this is irreversible; you may want to make a backup copy first)")
     % filename % schema % db_schema_id);
}

void
database::check_format()
{
  results res_revisions;
  string manifests_query = "SELECT 1 FROM manifests LIMIT 1";
  string revisions_query = "SELECT 1 FROM revisions LIMIT 1";
  string rosters_query = "SELECT 1 FROM rosters LIMIT 1";

  fetch(res_revisions, one_col, any_rows, query(revisions_query));

  if (res_revisions.size() > 0)
    {
      // they have revisions, so they can't be _ancient_, but they still might
      // not have rosters
      results res_rosters;
      fetch(res_rosters, one_col, any_rows, query(rosters_query));
      N(res_rosters.size() != 0,
        F("database %s contains revisions but no rosters\n"
          "if you are a project leader or doing local testing:\n"
          "  see the file UPGRADE for instructions on upgrading.\n"
          "if you are not a project leader:\n"
          "  wait for a leader to migrate project data, and then\n"
          "  pull into a fresh database.\n"
          "sorry about the inconvenience.")
        % filename);
    }
  else
    {
      // they have no revisions, so they shouldn't have any manifests either.
      // if they do, their db is probably ancient.  (though I guess you could
      // trigger this check by taking a pre-roster monotone, doing "db
      // init; commit; db kill_rev_locally", and then upgrading to a
      // rosterified monotone.)
      results res_manifests;
      fetch(res_manifests, one_col, any_rows, query(manifests_query));
      N(res_manifests.size() == 0,
        F("database %s contains manifests but no revisions\n"
          "this is a very old database; it needs to be upgraded\n"
          "please see README.changesets for details")
        % filename);
    }
}

// sqlite3_value_text gives a const unsigned char * but most of the time
// we need a signed char
const char *
sqlite3_value_text_s(sqlite3_value *v)
{
  return (const char *)(sqlite3_value_text(v));
}

const char *
sqlite3_column_text_s(sqlite3_stmt *stmt, int col)
{
  return (const char *)(sqlite3_column_text(stmt, col));
}

static void
sqlite3_unbase64_fn(sqlite3_context *f, int nargs, sqlite3_value ** args)
{
  if (nargs != 1)
    {
      sqlite3_result_error(f, "need exactly 1 arg to unbase64()", -1);
      return;
    }
  data decoded;
  decode_base64(base64<data>(string(sqlite3_value_text_s(args[0]))), decoded);
  sqlite3_result_blob(f, decoded().c_str(), decoded().size(), SQLITE_TRANSIENT);
}

static void
sqlite3_unpack_fn(sqlite3_context *f, int nargs, sqlite3_value ** args)
{
  if (nargs != 1)
    {
      sqlite3_result_error(f, "need exactly 1 arg to unpack()", -1);
      return;
    }
  data unpacked;
  unpack(base64< gzip<data> >(string(sqlite3_value_text_s(args[0]))), unpacked);
  sqlite3_result_blob(f, unpacked().c_str(), unpacked().size(), SQLITE_TRANSIENT);
}

void
database::set_app(app_state * app)
{
  __app = app;
}

static void
check_sqlite_format_version(system_path const & filename)
{
  // sqlite 3 files begin with this constant string
  // (version 2 files begin with a different one)
  std::string version_string("SQLite format 3");

  std::ifstream file(filename.as_external().c_str());
  N(file, F("unable to probe database version in file %s") % filename);

  for (std::string::const_iterator i = version_string.begin();
       i != version_string.end(); ++i)
    {
      char c;
      file.get(c);
      N(c == *i, F("database %s is not an sqlite version 3 file, "
                   "try dump and reload") % filename);
    }
}


static void
assert_sqlite3_ok(sqlite3 *s)
{
  int errcode = sqlite3_errcode(s);

  if (errcode == SQLITE_OK) return;

  const char * errmsg = sqlite3_errmsg(s);

  // sometimes sqlite is not very helpful
  // so we keep a table of errors people have gotten and more helpful versions
  if (errcode != SQLITE_OK)
    {
      // first log the code so we can find _out_ what the confusing code
      // was... note that code does not uniquely identify the errmsg, unlike
      // errno's.
      L(FL("sqlite error: %d: %s") % errcode % errmsg);
    }
  std::string auxiliary_message = "";
  if (errcode == SQLITE_ERROR)
    {
      auxiliary_message += _("make sure database and containing directory are writeable\n"
                             "and you have not run out of disk space");
    }
  // if the last message is empty, the \n will be stripped off too
  E(errcode == SQLITE_OK,
    // kind of string surgery to avoid ~duplicate strings
    F("sqlite error: %d: %s\n%s") % errcode % errmsg % auxiliary_message);
}

struct sqlite3 *
database::sql(bool init, bool migrating_format)
{
  if (! __sql)
    {
      check_filename();

      if (! init)
        {
          check_db_exists();
          check_sqlite_format_version(filename);
        }

      open();

      if (init)
        {
          sqlite3_exec(__sql, schema_constant, NULL, NULL, NULL);
          assert_sqlite3_ok(__sql);
        }

      check_schema();
      install_functions(__app);
      install_views();

      if (!migrating_format)
        check_format();
    }
  else
    {
      I(!init);
      I(!migrating_format);
    }
  return __sql;
}

void
database::initialize()
{
  if (__sql)
    throw oops("cannot initialize database while it is open");

  require_path_is_nonexistent(filename,
                              F("could not initialize database: %s: already exists")
                              % filename);

  system_path journal(filename.as_internal() + "-journal");
  require_path_is_nonexistent(journal,
                              F("existing (possibly stale) journal file '%s' "
                                "has same stem as new database '%s'\n"
                                "cancelling database creation")
                              % journal % filename);

  sqlite3 *s = sql(true);
  I(s != NULL);
}


struct
dump_request
{
  dump_request() {};
  struct sqlite3 *sql;
  string table_name;
  ostream *out;
};

static int
dump_row_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(vals != NULL);
  I(dump->out != NULL);
  *(dump->out) << boost::format("INSERT INTO %s VALUES(") % dump->table_name;
  for (int i = 0; i < n; ++i)
    {
      if (i != 0)
        *(dump->out) << ',';

      if (vals[i] == NULL)
        *(dump->out) << "NULL";
      else
        {
          *(dump->out) << "'";
          for (char *cp = vals[i]; *cp; ++cp)
            {
              if (*cp == '\'')
                *(dump->out) << "''";
              else
                *(dump->out) << *cp;
            }
          *(dump->out) << "'";
        }
    }
  *(dump->out) << ");\n";
  return 0;
}

static int
dump_table_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(dump->sql != NULL);
  I(vals != NULL);
  I(vals[0] != NULL);
  I(vals[1] != NULL);
  I(vals[2] != NULL);
  I(n == 3);
  I(string(vals[1]) == "table");
  *(dump->out) << vals[2] << ";\n";
  dump->table_name = string(vals[0]);
  string query = "SELECT * FROM " + string(vals[0]);
  sqlite3_exec(dump->sql, query.c_str(), dump_row_cb, data, NULL);
  assert_sqlite3_ok(dump->sql);
  return 0;
}

static int
dump_index_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(dump->sql != NULL);
  I(vals != NULL);
  I(vals[0] != NULL);
  I(vals[1] != NULL);
  I(vals[2] != NULL);
  I(n == 3);
  I(string(vals[1]) == "index");
  *(dump->out) << vals[2] << ";\n";
  return 0;
}

void
database::dump(ostream & out)
{
  transaction_guard guard(*this);
  dump_request req;
  req.out = &out;
  req.sql = sql();
  out << "BEGIN EXCLUSIVE;\n";
  int res;
  res = sqlite3_exec(req.sql,
                        "SELECT name, type, sql FROM sqlite_master "
                        "WHERE type='table' AND sql NOT NULL "
                        "AND name not like 'sqlite_stat%' "
                        "ORDER BY name",
                        dump_table_cb, &req, NULL);
  assert_sqlite3_ok(req.sql);
  res = sqlite3_exec(req.sql,
                        "SELECT name, type, sql FROM sqlite_master "
                        "WHERE type='index' AND sql NOT NULL "
                        "ORDER BY name",
                        dump_index_cb, &req, NULL);
  assert_sqlite3_ok(req.sql);
  out << "COMMIT;\n";
  guard.commit();
}

void
database::load(istream & in)
{
  string line;
  string sql_stmt;

  check_filename();

  require_path_is_nonexistent(filename,
                              F("cannot create %s; it already exists") % filename);

  open();

  while(in)
    {
      getline(in, line, ';');
      sql_stmt += line + ';';

      if (sqlite3_complete(sql_stmt.c_str()))
        {
          sqlite3_exec(__sql, sql_stmt.c_str(), NULL, NULL, NULL);
          sql_stmt.clear();
        }
    }

  assert_sqlite3_ok(__sql);
}


void
database::debug(string const & sql, ostream & out)
{
  results res;
  fetch(res, any_cols, any_rows, query(sql));
  out << "'" << sql << "' -> " << res.size() << " rows\n" << endl;
  for (size_t i = 0; i < res.size(); ++i)
    {
      for (size_t j = 0; j < res[i].size(); ++j)
        {
          if (j != 0)
            out << " | ";
          out << res[i][j];
        }
      out << endl;
    }
}


namespace
{
  unsigned long
  add(unsigned long count, unsigned long & total)
  {
    total += count;
    return count;
  }
}

void
database::info(ostream & out)
{
  string id;
  calculate_schema_id(sql(), id);

  unsigned long total = 0UL;

#define SPACE_USAGE(TABLE, COLS) add(space_usage(TABLE, COLS), total)

  out << \
    F("schema version    : %s\n"
      "counts:\n"
      "  full rosters    : %u\n"
      "  roster deltas   : %u\n"
      "  full files      : %u\n"
      "  file deltas     : %u\n"
      "  revisions       : %u\n"
      "  ancestry edges  : %u\n"
      "  certs           : %u\n"
      "bytes:\n"
      "  full rosters    : %u\n"
      "  roster deltas   : %u\n"
      "  full files      : %u\n"
      "  file deltas     : %u\n"
      "  revisions       : %u\n"
      "  cached ancestry : %u\n"
      "  certs           : %u\n"
      "  total           : %u\n"
      )
    % id
    // counts
    % count("rosters")
    % count("roster_deltas")
    % count("files")
    % count("file_deltas")
    % count("revisions")
    % count("revision_ancestry")
    % count("revision_certs")
    // bytes
    % SPACE_USAGE("rosters", "id || data")
    % SPACE_USAGE("roster_deltas", "id || base || delta")
    % SPACE_USAGE("files", "id || data")
    % SPACE_USAGE("file_deltas", "id || base || delta")
    % SPACE_USAGE("revisions", "id || data")
    % SPACE_USAGE("revision_ancestry", "parent || child")
    % SPACE_USAGE("revision_certs", "hash || id || name || value || keypair || signature")
    % total;

#undef SPACE_USAGE
}

void
database::version(ostream & out)
{
  string id;

  check_filename();
  check_db_exists();
  open();

  calculate_schema_id(__sql, id);

  close();

  out << F("database schema version: %s") % id << endl;
}

void
database::migrate()
{
  check_filename();
  check_db_exists();
  open();

  migrate_monotone_schema(__sql, __app);

  close();
}

void
database::ensure_open()
{
  sqlite3 *s = sql();
  I(s != NULL);
}

void
database::ensure_open_for_format_changes()
{
  sqlite3 *s = sql(false, true);
  I(s != NULL);
}

database::~database()
{
  L(FL("statement cache statistics\n"));
  L(FL("prepared %d statements\n") % statement_cache.size());

  for (map<string, statement>::const_iterator i = statement_cache.begin();
       i != statement_cache.end(); ++i)
    L(FL("%d executions of %s\n") % i->second.count % i->first);
  // trigger destructors to finalize cached statements
  statement_cache.clear();

  close();
}

void
database::execute(query const & query)
{
  results res;
  fetch(res, 0, 0, query );
}

void
database::fetch(results & res,
                int const want_cols,
                int const want_rows,
                query const & query)
{
  int nrow;
  int ncol;
  int rescode;

  res.clear();
  res.resize(0);

  map<string, statement>::iterator i = statement_cache.find(query.sql_cmd);
  if (i == statement_cache.end())
    {
      statement_cache.insert(make_pair(query.sql_cmd, statement()));
      i = statement_cache.find(query.sql_cmd);
      I(i != statement_cache.end());

      const char * tail;
      sqlite3_prepare(sql(), query.sql_cmd.c_str(), -1, i->second.stmt.paddr(), &tail);
      assert_sqlite3_ok(sql());
      L(FL("prepared statement %s\n") % query.sql_cmd);

      // no support for multiple statements here
      E(*tail == 0,
        F("multiple statements in query: %s\n") % query.sql_cmd);
    }

  ncol = sqlite3_column_count(i->second.stmt());

  E(want_cols == any_cols || want_cols == ncol,
    F("wanted %d columns got %d in query: %s\n") % want_cols % ncol % query.sql_cmd);

  // bind parameters for this execution

  int params = sqlite3_bind_parameter_count(i->second.stmt());

  // Ensure that exactly the right number of parameters were given
  I(params == int(query.args.size()));

  // profiling finds this logging to be quite expensive
  if (global_sanity.debug)
    L(FL("binding %d parameters for %s\n") % params % query.sql_cmd);

  for (int param = 1; param <= params; param++)
    {
      // profiling finds this logging to be quite expensive
      if (global_sanity.debug)
        {
          string log = query.args[param-1].data;

          if (log.size() > constants::log_line_sz)
            log = log.substr(0, constants::log_line_sz);

          L(FL("binding %d with value '%s'\n") % param % log);
        }

      switch (idx(query.args, param - 1).type)
        {
        case query_param::text:
          sqlite3_bind_text(i->second.stmt(), param,
                            idx(query.args, param - 1).data.c_str(), -1,
                            SQLITE_STATIC);
          break;
        case query_param::blob:
          {
            std::string const & data = idx(query.args, param - 1).data;
            sqlite3_bind_blob(i->second.stmt(), param,
                              data.data(), data.size(),
                              SQLITE_STATIC);
          }
          break;
        case query_param::integer:
          {
            uint32_t number = idx(query.args, param - 1).number;
            sqlite3_bind_int(i->second.stmt(), param, number);
          }
          break;
        default:
          I(false);
        }

      assert_sqlite3_ok(sql());
    }

  // execute and process results

  nrow = 0;
  for (rescode = sqlite3_step(i->second.stmt()); rescode == SQLITE_ROW;
       rescode = sqlite3_step(i->second.stmt()))
    {
      vector<string> row;
      for (int col = 0; col < ncol; col++)
        {
          const char * value = sqlite3_column_text_s(i->second.stmt(), col);
          E(value, F("null result in query: %s\n") % query.sql_cmd);
          row.push_back(value);
          //L(FL("row %d col %d value='%s'\n") % nrow % col % value);
        }
      res.push_back(row);
    }

  if (rescode != SQLITE_DONE)
    assert_sqlite3_ok(sql());

  sqlite3_reset(i->second.stmt());
  assert_sqlite3_ok(sql());

  nrow = res.size();

  i->second.count++;

  E(want_rows == any_rows || want_rows == nrow,
    F("wanted %d rows got %s in query: %s\n") % want_rows % nrow % query.sql_cmd);
}

// general application-level logic

void
database::set_filename(system_path const & file)
{
  I(!__sql);
  filename = file;
}

void
database::begin_transaction(bool exclusive)
{
  if (transaction_level == 0)
    {
      if (exclusive)
        execute(query("BEGIN EXCLUSIVE"));
      else
        execute(query("BEGIN DEFERRED"));
      transaction_exclusive = exclusive;
    }
  else
    {
      // You can't start an exclusive transaction within a non-exclusive
      // transaction
      I(!exclusive || transaction_exclusive);
    }
  transaction_level++;
}

void
database::commit_transaction()
{
  if (transaction_level == 1)
    execute(query("COMMIT"));
  transaction_level--;
}

void
database::rollback_transaction()
{
  if (transaction_level == 1)
    execute(query("ROLLBACK"));
  transaction_level--;
}


bool
database::exists(hexenc<id> const & ident,
                      string const & table)
{
  results res;
  query q("SELECT id FROM " + table + " WHERE id = ?");
  fetch(res, one_col, any_rows, q % text(ident()));
  I((res.size() == 1) || (res.size() == 0));
  return res.size() == 1;
}


bool
database::delta_exists(hexenc<id> const & ident,
                       string const & table)
{
  results res;
  query q("SELECT id FROM " + table + " WHERE id = ?");
  fetch(res, one_col, any_rows, q % text(ident()));
  return res.size() > 0;
}

bool
database::delta_exists(hexenc<id> const & ident,
                       hexenc<id> const & base,
                       string const & table)
{
  results res;
  query q("SELECT id FROM " + table + " WHERE id = ? AND base = ?");
  fetch(res, one_col, any_rows, q % text(ident()) % text(base()));
  return res.size() > 0;
}

unsigned long
database::count(string const & table)
{
  results res;
  query q("SELECT COUNT(*) FROM " + table);
  fetch(res, one_col, one_row, q);
  return lexical_cast<unsigned long>(res[0][0]);
}

unsigned long
database::space_usage(string const & table, string const & concatenated_columns)
{
  results res;
  // COALESCE is required since SUM({empty set}) is NULL.
  // the sqlite docs for SUM suggest this as a workaround
  query q("SELECT COALESCE(SUM(LENGTH(" + concatenated_columns + ")), 0) FROM " + table);
  fetch(res, one_col, one_row, q);
  return lexical_cast<unsigned long>(res[0][0]);
}

void
database::get_ids(string const & table, set< hexenc<id> > & ids)
{
  results res;
  query q("SELECT id FROM " + table);
  fetch(res, one_col, any_rows, q);

  for (size_t i = 0; i < res.size(); ++i)
    {
      ids.insert(hexenc<id>(res[i][0]));
    }
}

void
database::get(hexenc<id> const & ident,
              data & dat,
              string const & table)
{
  results res;
  query q("SELECT data FROM " + table + " WHERE id = ?");
  fetch(res, one_col, one_row, q % text(ident()));

  // consistency check
  base64<gzip<data> > rdata(res[0][0]);
  data rdata_unpacked;
  unpack(rdata, rdata_unpacked);

  hexenc<id> tid;
  calculate_ident(rdata_unpacked, tid);
  I(tid == ident);

  dat = rdata_unpacked;
}

void
database::get_delta(hexenc<id> const & ident,
                    hexenc<id> const & base,
                    delta & del,
                    string const & table)
{
  I(ident() != "");
  I(base() != "");
  results res;
  query q("SELECT delta FROM " + table + " WHERE id = ? AND base = ?");
  fetch(res, one_col, one_row, q % text(ident()) % text(base()));

  base64<gzip<delta> > del_packed = res[0][0];
  unpack(del_packed, del);
}

void
database::put(hexenc<id> const & ident,
              data const & dat,
              string const & table)
{
  // consistency check
  I(ident() != "");
  hexenc<id> tid;
  calculate_ident(dat, tid);
  MM(ident);
  MM(tid);
  I(tid == ident);

  base64<gzip<data> > dat_packed;
  pack(dat, dat_packed);

  uint32_t dat_size = dat_packed().size();

  string insert = "INSERT INTO " + table + " VALUES(?, ?, ?)";
  execute(query(insert)
          % text(ident())
          % text(dat_packed())
          % integer(dat_size));
}
void
database::put_delta(hexenc<id> const & ident,
                    hexenc<id> const & base,
                    delta const & del,
                    size_t parent_distance,
                    size_t parent_size,
                    string const & table)
{
  I(!null_id(ident));
  I(!null_id(base));

  base64<gzip<delta> > del_packed;
  pack(del, del_packed);

  uint32_t del_size = del_packed().size();

  string insert = "INSERT INTO "+table+" VALUES(?, ?, ?, ?, ?, ?)";
  execute(query(insert)
          % text(ident())
          % text(base())
          % text(del_packed())
          % integer(parent_distance + 1)
          % integer(parent_size + del_size)
          % integer(del_size));
}

// static ticker cache_hits("vcache hits", "h", 1);

struct version_cache
{
  size_t capacity;
  size_t use;
  std::map<hexenc<id>, data> cache;

  version_cache() : capacity(constants::db_version_cache_sz), use(0)
  {
    srand(time(NULL));
  }

  void put(hexenc<id> const & ident, data const & dat)
  {
    while (!cache.empty()
           && use + dat().size() > capacity)
      {
        std::string key = (boost::format("%08.8x%08.8x%08.8x%08.8x%08.8x")
                           % rand() % rand() % rand() % rand() % rand()).str();
        std::map<hexenc<id>, data>::const_iterator i;
        i = cache.lower_bound(hexenc<id>(key));
        if (i == cache.end())
          {
            // we can't find a random entry, probably there's only one
            // entry and we missed it. delete first entry instead.
            i = cache.begin();
          }
        I(i != cache.end());
        I(use >= i->second().size());
        L(FL("version cache expiring %s\n") % i->first);
        use -= i->second().size();
        cache.erase(i->first);
      }
    cache.insert(std::make_pair(ident, dat));
    use += dat().size();
  }

  bool exists(hexenc<id> const & ident)
  {
    std::map<hexenc<id>, data>::const_iterator i;
    i = cache.find(ident);
    return i != cache.end();
  }

  bool get(hexenc<id> const & ident, data & dat)
  {
    std::map<hexenc<id>, data>::const_iterator i;
    i = cache.find(ident);
    if (i == cache.end())
      return false;
    // ++cache_hits;
    L(FL("version cache hit on %s\n") % ident);
    dat = i->second;
    return true;
  }
};

static version_cache vcache;

typedef vector< hexenc<id> > version_path;

static void
extend_path_if_not_cycle(string table_name,
                         shared_ptr<version_path> p,
                         hexenc<id> const & ext,
                         set< hexenc<id> > & seen_nodes,
                         vector< shared_ptr<version_path> > & next_paths)
{
  for (version_path::const_iterator i = p->begin(); i != p->end(); ++i)
    {
      if ((*i)() == ext())
        throw oops("cycle in table '" + table_name + "', at node "
                   + (*i)() + " <- " + ext());
    }

  if (seen_nodes.find(ext) == seen_nodes.end())
    {
      p->push_back(ext);
      next_paths.push_back(p);
      seen_nodes.insert(ext);
    }
}

void
database::get_version(hexenc<id> const & ident,
                      data & dat,
                      string const & data_table,
                      string const & delta_table)
{
  I(ident() != "");

  if (vcache.get(ident, dat))
    {
      return;
    }
  else if (exists(ident, data_table))
    {
     // easy path
     get(ident, dat, data_table);
    }
  else
    {
      // tricky path

      // we start from the file we want to reconstruct and work *forwards*
      // through the database, until we get to a full data object. we then
      // trace back through the list of edges we followed to get to the data
      // object, applying reverse deltas.
      //
      // the effect of this algorithm is breadth-first search, backwards
      // through the storage graph, to discover a forwards shortest path, and
      // then following that shortest path with delta application.
      //
      // we used to do this with the boost graph library, but it invovled
      // loading too much of the storage graph into memory at any moment. this
      // imperative version only loads the descendents of the reconstruction
      // node, so it much cheaper in terms of memory.
      //
      // we also maintain a cycle-detecting set, just to be safe

      L(FL("reconstructing %s in %s\n") % ident % delta_table);
      I(delta_exists(ident, delta_table));

      // Our reconstruction algorithm involves keeping a set of parallel
      // linear paths, starting from ident, moving forward through the
      // storage DAG until we hit a storage root.
      //
      // On each iteration, we extend every active path by one step. If our
      // extension involves a fork, we duplicate the path. If any path
      // contains a cycle, we fault.
      //
      // If, by extending a path C, we enter a node which another path
      // D has already seen, we kill path C. This avoids the possibility of
      // exponential growth in the number of paths due to extensive forking
      // and merging.

      vector< shared_ptr<version_path> > live_paths;

      string delta_query = "SELECT base FROM " + delta_table + " WHERE id = ?";

      {
        shared_ptr<version_path> pth0 = shared_ptr<version_path>(new version_path());
        pth0->push_back(ident);
        live_paths.push_back(pth0);
      }

      shared_ptr<version_path> selected_path;
      set< hexenc<id> > seen_nodes;

      while (!selected_path)
        {
          vector< shared_ptr<version_path> > next_paths;

          for (vector<shared_ptr<version_path> >::const_iterator i = live_paths.begin();
               i != live_paths.end(); ++i)
            {
              shared_ptr<version_path> pth = *i;
              hexenc<id> tip = pth->back();

              if (vcache.exists(tip) || exists(tip, data_table))
                {
                  selected_path = pth;
                  break;
                }
              else
                {
                  // This tip is not a root, so extend the path.
                  results res;
                  fetch(res, one_col, any_rows,
                        query(delta_query)
                        % text(tip()));

                  I(res.size() != 0);

                  // Replicate the path if there's a fork.
                  for (size_t k = 1; k < res.size(); ++k)
                    {
                      shared_ptr<version_path> pthN
                        = shared_ptr<version_path>(new version_path(*pth));
                      extend_path_if_not_cycle(delta_table, pthN,
                                               hexenc<id>(res[k][0]),
                                               seen_nodes, next_paths);
                    }

                  // And extend the base path we're examining.
                  extend_path_if_not_cycle(delta_table, pth,
                                           hexenc<id>(res[0][0]),
                                           seen_nodes, next_paths);
                }
            }

          I(selected_path || !next_paths.empty());
          live_paths = next_paths;
        }

      // Found a root, now trace it back along the path.

      I(selected_path);
      I(selected_path->size() > 1);

      hexenc<id> curr = selected_path->back();
      selected_path->pop_back();
      data begin;

      if (vcache.exists(curr))
        {
          I(vcache.get(curr, begin));
        }
      else
        {
          get(curr, begin, data_table);
        }

      boost::shared_ptr<delta_applicator> app = new_piecewise_applicator();
      app->begin(begin());

      for (version_path::reverse_iterator i = selected_path->rbegin();
           i != selected_path->rend(); ++i)
        {
          hexenc<id> const nxt = *i;

          /*
          if (!vcache.exists(curr))
            {
              string tmp;
              app->finish(tmp);
              vcache.put(curr, tmp);
            }
            */

          L(FL("following delta %s -> %s\n") % curr % nxt);
          delta del;
          get_delta(nxt, curr, del, delta_table);
          apply_delta (app, del());

          app->next();
          curr = nxt;
        }

      string tmp;
      app->finish(tmp);
      dat = data(tmp);

      hexenc<id> final;
      calculate_ident(dat, final);
      I(final == ident);
    }
  vcache.put(ident, dat);
}


void
database::drop(hexenc<id> const & ident,
               string const & table)
{
  string drop = "DELETE FROM " + table + " WHERE id = ?";
  execute(query(drop) % text(ident()));
}

void
database::get_distance(hexenc<id> const & id,
                       uint32_t & distance,
                       uint32_t & size,
                       string const & data_table,
                       string const & delta_table)
{
  MM(id);
  MM(data_table);
  MM(delta_table);
  {
    results res;
    query q("SELECT size FROM " + data_table + " WHERE id = ?");
    fetch(res, one_col, any_rows, q % text(id()));
    if (res.size() > 0)
      {
        distance = 0;
        size = lexical_cast<uint32_t>(res[0][0]);
        return;
      }
  }

  {
    results res;
    query q("SELECT path_dist, path_size FROM " + delta_table + " WHERE id = ?");
    fetch(res, 2, any_rows, q % text(id()));
    if (res.size() > 0)
      {
        distance = lexical_cast<uint32_t>(res[0][0]);
        size = lexical_cast<uint32_t>(res[0][1]);
        return;
      }
  }
  I(false);
}

void
database::put_version(hexenc<id> const & old_id,
                      hexenc<id> const & new_id,
                      delta const & del,
                      data const & dat,
                      string const & data_table,
                      string const & delta_table)
{
  // TODO: add an invariant or something perhaps
  static ticker put_full("full", "f", 1);
  static ticker put_del("del", "d", 1);

  transaction_guard guard(*this);
  uint32_t parent_distance, parent_size