Below is the file 'database.cc' from this revision. You can also download the file.

// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*-
// copyright (C) 2002, 2003 graydon hoare <graydon@pobox.com>
// all rights reserved.
// licensed to the public under the terms of the GNU GPL (>= 2)
// see the file COPYING for details

#include <fstream>
#include <iterator>
#include <list>
#include <set>
#include <sstream>
#include <vector>

#include <string.h>

#include <boost/shared_ptr.hpp>
#include <boost/lexical_cast.hpp>

#include <sqlite3.h>

#include "app_state.hh"
#include "cert.hh"
#include "cleanup.hh"
#include "constants.hh"
#include "database.hh"
#include "keys.hh"
#include "sanity.hh"
#include "schema_migration.hh"
#include "transforms.hh"
#include "ui.hh"
#include "vocab.hh"
#include "xdelta.hh"
#include "epoch.hh"

// defined in schema.sql, converted to header:
#include "schema.h"

// defined in views.sql, converted to header:
#include "views.h"

// this file defines a public, typed interface to the database.
// the database class encapsulates all knowledge about sqlite,
// the schema, and all SQL statements used to access the schema.
//
// see file schema.sql for the text of the schema.

using boost::shared_ptr;
using boost::lexical_cast;
using namespace std;

int const one_row = 1;
int const one_col = 1;
int const any_rows = -1;
int const any_cols = -1;

extern "C" {
// some wrappers to ease migration
  const char *sqlite3_value_text_s(sqlite3_value *v);
  const char *sqlite3_column_text_s(sqlite3_stmt*, int col);
}

database::database(system_path const & fn) :
  filename(fn),
  // nb. update this if you change the schema. unfortunately we are not
  // using self-digesting schemas due to comment irregularities and
  // non-alphabetic ordering of tables in sql source files. we could create
  // a temporary db, write our intended schema into it, and read it back,
  // but this seems like it would be too rude. possibly revisit this issue.
  schema("1509fd75019aebef5ac3da3a5edf1312393b70e9"),
  __sql(NULL),
  transaction_level(0)
{}

void
database::check_schema()
{
  string db_schema_id;
  calculate_schema_id (__sql, db_schema_id);
  N (schema == db_schema_id,
     F("database schemas do not match: "
       "wanted %s, got %s. try migrating database")
     % schema % db_schema_id);
}

// sqlite3_value_text gives a const unsigned char * but most of the time
// we need a signed char
const char *
sqlite3_value_text_s(sqlite3_value *v)
{
  return (const char *)(sqlite3_value_text(v));
}

const char *
sqlite3_column_text_s(sqlite3_stmt *stmt, int col)
{
  return (const char *)(sqlite3_column_text(stmt, col));
}

static void
sqlite3_unbase64_fn(sqlite3_context *f, int nargs, sqlite3_value ** args)
{
  if (nargs != 1)
    {
      sqlite3_result_error(f, "need exactly 1 arg to unbase64()", -1);
      return;
    }
  data decoded;
  decode_base64(base64<data>(string(sqlite3_value_text_s(args[0]))), decoded);
  sqlite3_result_blob(f, decoded().c_str(), decoded().size(), SQLITE_TRANSIENT);
}

static void
sqlite3_unpack_fn(sqlite3_context *f, int nargs, sqlite3_value ** args)
{
  if (nargs != 1)
    {
      sqlite3_result_error(f, "need exactly 1 arg to unpack()", -1);
      return;
    }
  data unpacked;
  unpack(base64< gzip<data> >(string(sqlite3_value_text_s(args[0]))), unpacked);
  sqlite3_result_blob(f, unpacked().c_str(), unpacked().size(), SQLITE_TRANSIENT);
}

void
database::set_app(app_state * app)
{
  __app = app;
}

static void
check_sqlite_format_version(system_path const & filename)
{
  require_path_is_file(filename,
                       F("database %s does not exist") % filename,
                       F("%s is a directory, not a database") % filename);

  // sqlite 3 files begin with this constant string
  // (version 2 files begin with a different one)
  std::string version_string("SQLite format 3");

  std::ifstream file(filename.as_external().c_str());
  N(file, F("unable to probe database version in file %s") % filename);

  for (std::string::const_iterator i = version_string.begin();
       i != version_string.end(); ++i)
    {
      char c;
      file.get(c);
      N(c == *i, F("database %s is not an sqlite version 3 file, "
                   "try dump and reload") % filename);
    }
}


static void
assert_sqlite3_ok(sqlite3 *s)
{
  int errcode = sqlite3_errcode(s);

  if (errcode == SQLITE_OK) return;

  const char * errmsg = sqlite3_errmsg(s);

  // sometimes sqlite is not very helpful
  // so we keep a table of errors people have gotten and more helpful versions
  if (errcode != SQLITE_OK)
    {
      // first log the code so we can find _out_ what the confusing code
      // was... note that code does not uniquely identify the errmsg, unlike
      // errno's.
      L(F("sqlite error: %d: %s") % errcode % errmsg);
    }
  std::string auxiliary_message = "";
  if (errcode == SQLITE_ERROR)
    {
      auxiliary_message = _("make sure database and containing directory are writeable");
    }
  // if the last message is empty, the \n will be stripped off too
  E(errcode == SQLITE_OK,
    // kind of string surgery to avoid ~duplicate strings
    boost::format("%s\n%s")
                  % (F("sqlite error: %d: %s") % errcode % errmsg).str()
                  % auxiliary_message);
}

struct sqlite3 *
database::sql(bool init)
{
  if (! __sql)
    {
      check_filename();

      if (! init)
        {
          require_path_is_file(filename,
                               F("database %s does not exist") % filename,
                               F("%s is a directory, not a database") % filename);
          check_sqlite_format_version(filename);
        }

      open();

      if (init)
        {
          sqlite3_exec(__sql, schema_constant, NULL, NULL, NULL);
          assert_sqlite3_ok(__sql);
        }

      check_schema();
      install_functions(__app);
      install_views();
    }
  return __sql;
}

void
database::initialize()
{
  if (__sql)
    throw oops("cannot initialize database while it is open");

  require_path_is_nonexistent(filename,
                              F("could not initialize database: %s: already exists")
                              % filename);

  system_path journal(filename.as_internal() + "-journal");
  require_path_is_nonexistent(journal,
                              F("existing (possibly stale) journal file '%s' "
                                "has same stem as new database '%s'\n"
                                "cancelling database creation")
                              % journal % filename);

  sqlite3 *s = sql(true);
  I(s != NULL);
}


struct
dump_request
{
  dump_request() {};
  struct sqlite3 *sql;
  string table_name;
  ostream *out;
};

static int
dump_row_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(vals != NULL);
  I(dump->out != NULL);
  *(dump->out) << boost::format("INSERT INTO %s VALUES(") % dump->table_name;
  for (int i = 0; i < n; ++i)
    {
      if (i != 0)
        *(dump->out) << ',';

      if (vals[i] == NULL)
        *(dump->out) << "NULL";
      else
        {
          *(dump->out) << "'";
          for (char *cp = vals[i]; *cp; ++cp)
            {
              if (*cp == '\'')
                *(dump->out) << "''";
              else
                *(dump->out) << *cp;
            }
          *(dump->out) << "'";
        }
    }
  *(dump->out) << ");\n";
  return 0;
}

static int
dump_table_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(dump->sql != NULL);
  I(vals != NULL);
  I(vals[0] != NULL);
  I(vals[1] != NULL);
  I(vals[2] != NULL);
  I(n == 3);
  I(string(vals[1]) == "table");
  *(dump->out) << vals[2] << ";\n";
  dump->table_name = string(vals[0]);
  string query = "SELECT * FROM " + string(vals[0]);
  sqlite3_exec(dump->sql, query.c_str(), dump_row_cb, data, NULL);
  assert_sqlite3_ok(dump->sql);
  return 0;
}

static int
dump_index_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(dump->sql != NULL);
  I(vals != NULL);
  I(vals[0] != NULL);
  I(vals[1] != NULL);
  I(vals[2] != NULL);
  I(n == 3);
  I(string(vals[1]) == "index");
  *(dump->out) << vals[2] << ";\n";
  return 0;
}

void
database::dump(ostream & out)
{
  dump_request req;
  req.out = &out;
  req.sql = sql();
  out << "BEGIN TRANSACTION;\n";
  int res;
  res = sqlite3_exec(req.sql,
                        "SELECT name, type, sql FROM sqlite_master "
                        "WHERE type='table' AND sql NOT NULL "
                        "ORDER BY name",
                        dump_table_cb, &req, NULL);
  assert_sqlite3_ok(req.sql);
  res = sqlite3_exec(req.sql,
                        "SELECT name, type, sql FROM sqlite_master "
                        "WHERE type='index' AND sql NOT NULL "
                        "ORDER BY name",
                        dump_index_cb, &req, NULL);
  assert_sqlite3_ok(req.sql);
  out << "COMMIT;\n";
}

void
database::load(istream & in)
{
  char buf[constants::bufsz];
  string tmp;

  check_filename();

  require_path_is_nonexistent(filename,
                              F("cannot create %s; it already exists") % filename);

  open();

  while(in)
    {
      in.read(buf, constants::bufsz);
      tmp.append(buf, in.gcount());

      const char* last_statement = 0;
      sqlite3_complete_last(tmp.c_str(), &last_statement);
      if (last_statement == 0)
        continue;
      string::size_type len = last_statement + 1 - tmp.c_str();
      sqlite3_exec(__sql, tmp.substr(0, len).c_str(), NULL, NULL, NULL);
      tmp.erase(0, len);
    }

  if (!tmp.empty())
    sqlite3_exec(__sql, tmp.c_str(), NULL, NULL, NULL);
  assert_sqlite3_ok(__sql);
}


void
database::debug(string const & sql, ostream & out)
{
  results res;
  fetch(res, any_cols, any_rows, sql.c_str());
  out << "'" << sql << "' -> " << res.size() << " rows\n" << endl;
  for (size_t i = 0; i < res.size(); ++i)
    {
      for (size_t j = 0; j < res[i].size(); ++j)
        {
          if (j != 0)
            out << " | ";
          out << res[i][j];
        }
      out << endl;
    }
}


namespace
{
  unsigned long
  add(unsigned long count, unsigned long & total)
  {
    total += count;
    return count;
  }
}

void
database::info(ostream & out)
{
  string id;
  calculate_schema_id(sql(), id);

  unsigned long total = 0UL;

#define SPACE_USAGE(TABLE, COLS) add(space_usage(TABLE, COLS), total)

  out << \
    F("schema version    : %s\n"
      "counts:\n"
      "  full manifests  : %u\n"
      "  manifest deltas : %u\n"
      "  full files      : %u\n"
      "  file deltas     : %u\n"
      "  revisions       : %u\n"
      "  ancestry edges  : %u\n"
      "  certs           : %u\n"
      "bytes:\n"
      "  full manifests  : %u\n"
      "  manifest deltas : %u\n"
      "  full files      : %u\n"
      "  file deltas     : %u\n"
      "  revisions       : %u\n"
      "  cached ancestry : %u\n"
      "  certs           : %u\n"
      "  total           : %u\n"
      )
    % id
    // counts
    % count("manifests")
    % count("manifest_deltas")
    % count("files")
    % count("file_deltas")
    % count("revisions")
    % count("revision_ancestry")
    % count("revision_certs")
    // bytes
    % SPACE_USAGE("manifests", "id || data")
    % SPACE_USAGE("manifest_deltas", "id || base || delta")
    % SPACE_USAGE("files", "id || data")
    % SPACE_USAGE("file_deltas", "id || base || delta")
    % SPACE_USAGE("revisions", "id || data")
    % SPACE_USAGE("revision_ancestry", "parent || child")
    % SPACE_USAGE("revision_certs", "hash || id || name || value || keypair || signature")
    % total;

#undef SPACE_USAGE
}

void
database::version(ostream & out)
{
  string id;

  check_filename();
  open();

  calculate_schema_id(__sql, id);

  sqlite3_close(__sql);

  out << F("database schema version: %s") % id << endl;
}

void
database::migrate()
{
  check_filename();

  open();

  migrate_monotone_schema(__sql);
  sqlite3_close(__sql);
}

void
database::rehash()
{
  transaction_guard guard(*this);
  ticker mcerts(_("mcerts"), "m", 1);
  ticker pubkeys(_("pubkeys"), "+", 1);
  ticker privkeys(_("privkeys"), "!", 1);

  {
    // rehash all mcerts
    results res;
    vector<cert> certs;
    fetch(res, 5, any_rows,
          "SELECT id, name, value, keypair, signature "
          "FROM manifest_certs");
    results_to_certs(res, certs);
    execute("DELETE FROM manifest_certs");
    for(vector<cert>::const_iterator i = certs.begin(); i != certs.end(); ++i)
      {
        put_cert(*i, "manifest_certs");
        ++mcerts;
      }
  }

  {
    // rehash all pubkeys
    results res;
    fetch(res, 2, any_rows, "SELECT id, keydata FROM public_keys");
    execute("DELETE FROM public_keys");
    for (size_t i = 0; i < res.size(); ++i)
      {
        hexenc<id> tmp;
        key_hash_code(rsa_keypair_id(res[i][0]), base64<rsa_pub_key>(res[i][1]), tmp);
        execute("INSERT INTO public_keys VALUES(?, ?, ?)",
                tmp().c_str(), res[i][0].c_str(), res[i][1].c_str());
        ++pubkeys;
      }
  }

  {
    // rehash all privkeys
    results res;
    fetch(res, 2, any_rows, "SELECT id, keydata FROM private_keys");
    execute("DELETE FROM private_keys");
    for (size_t i = 0; i < res.size(); ++i)
      {
        hexenc<id> tmp;
        key_hash_code(rsa_keypair_id(res[i][0]), base64< arc4<rsa_priv_key> >(res[i][1]), tmp);
        execute("INSERT INTO private_keys VALUES(?, ?, ?)",
                tmp().c_str(), res[i][0].c_str(), res[i][1].c_str());
        ++privkeys;
      }
  }

  guard.commit();
}

void
database::ensure_open()
{
  sqlite3 *s = sql();
  I(s != NULL);
}

database::~database()
{
  L(F("statement cache statistics\n"));
  L(F("prepared %d statements\n") % statement_cache.size());

  for (map<string, statement>::const_iterator i = statement_cache.begin();
       i != statement_cache.end(); ++i)
    {
      L(F("%d executions of %s\n") % i->second.count % i->first);
      sqlite3_finalize(i->second.stmt);
    }

  if (__sql)
    {
      sqlite3_close(__sql);
      __sql = 0;
    }
}

void
database::execute(char const * query, ...)
{
  results res;
  va_list args;
  va_start(args, query);
  fetch(res, 0, 0, query, args);
  va_end(args);
}

void
database::fetch(results & res,
                int const want_cols,
                int const want_rows,
                char const * query, ...)
{
  va_list args;
  va_start(args, query);
  fetch(res, want_cols, want_rows, query, args);
  va_end(args);
}

void
database::fetch(results & res,
                int const want_cols,
                int const want_rows,
                char const * query,
                va_list args)
{
  int nrow;
  int ncol;
  int rescode;

  res.clear();
  res.resize(0);

  map<string, statement>::iterator i = statement_cache.find(query);
  if (i == statement_cache.end())
    {
      statement_cache.insert(make_pair(query, statement()));
      i = statement_cache.find(query);
      I(i != statement_cache.end());

      const char * tail;
      sqlite3_prepare(sql(), query, -1, &i->second.stmt, &tail);
      assert_sqlite3_ok(sql());
      L(F("prepared statement %s\n") % query);

      // no support for multiple statements here
      E(*tail == 0,
        F("multiple statements in query: %s\n") % query);
    }

  ncol = sqlite3_column_count(i->second.stmt);

  E(want_cols == any_cols || want_cols == ncol,
    F("wanted %d columns got %d in query: %s\n") % want_cols % ncol % query);

  // bind parameters for this execution

  int params = sqlite3_bind_parameter_count(i->second.stmt);

  L(F("binding %d parameters for %s\n") % params % query);

  for (int param = 1; param <= params; param++)
    {
      char *value = va_arg(args, char *);
      // nb: transient will not be good for inserts with large data blobs
      // however, it's no worse than the previous '%q' stuff in this regard
      // might want to wrap this logging with --debug or --verbose to limit it

      string log = string(value);

      if (log.size() > constants::log_line_sz)
        log = log.substr(0, constants::log_line_sz);

      L(F("binding %d with value '%s'\n") % param % log);

      sqlite3_bind_text(i->second.stmt, param, value, -1, SQLITE_TRANSIENT);
      assert_sqlite3_ok(sql());
    }

  // execute and process results

  nrow = 0;
  for (rescode = sqlite3_step(i->second.stmt); rescode == SQLITE_ROW;
       rescode = sqlite3_step(i->second.stmt))
    {
      vector<string> row;
      for (int col = 0; col < ncol; col++)
        {
          const char * value = sqlite3_column_text_s(i->second.stmt, col);
          E(value, F("null result in query: %s\n") % query);
          row.push_back(value);
          //L(F("row %d col %d value='%s'\n") % nrow % col % value);
        }
      res.push_back(row);
    }

  if (rescode != SQLITE_DONE)
    assert_sqlite3_ok(sql());

  sqlite3_reset(i->second.stmt);
  assert_sqlite3_ok(sql());

  nrow = res.size();

  i->second.count++;

  E(want_rows == any_rows || want_rows == nrow,
    F("wanted %d rows got %s in query: %s\n") % want_rows % nrow % query);
}

// general application-level logic

void
database::set_filename(system_path const & file)
{
  if (__sql)
    {
      throw oops((F("cannot change filename to %s while db is open") % file).str());
    }
  filename = file;
}

void
database::begin_transaction()
{
  if (transaction_level == 0)
    execute("BEGIN");
  transaction_level++;
}

void
database::commit_transaction()
{
  if (transaction_level == 1)
    execute("COMMIT");
  transaction_level--;
}

void
database::rollback_transaction()
{
  if (transaction_level == 1)
    execute("ROLLBACK");
  transaction_level--;
}


bool
database::exists(hexenc<id> const & ident,
                      string const & table)
{
  results res;
  string query = "SELECT id FROM " + table + " WHERE id = ?";
  fetch(res, one_col, any_rows, query.c_str(), ident().c_str());
  I((res.size() == 1) || (res.size() == 0));
  return res.size() == 1;
}


bool
database::delta_exists(hexenc<id> const & ident,
                       string const & table)
{
  results res;
  string query = "SELECT id FROM " + table + " WHERE id = ?";
  fetch(res, one_col, any_rows, query.c_str(), ident().c_str());
  return res.size() > 0;
}

bool
database::delta_exists(hexenc<id> const & ident,
                       hexenc<id> const & base,
                       string const & table)
{
  results res;
  string query = "SELECT id FROM " + table + " WHERE id = ? AND base = ?";
  fetch(res, one_col, any_rows, query.c_str(),
        ident().c_str(), base().c_str());
  I((res.size() == 1) || (res.size() == 0));
  return res.size() == 1;
}

unsigned long
database::count(string const & table)
{
  results res;
  string query = "SELECT COUNT(*) FROM " + table;
  fetch(res, one_col, one_row, query.c_str());
  return lexical_cast<unsigned long>(res[0][0]);
}

unsigned long
database::space_usage(string const & table, string const & concatenated_columns)
{
  results res;
  // COALESCE is required since SUM({empty set}) is NULL.
  // the sqlite docs for SUM suggest this as a workaround
  string query = "SELECT COALESCE(SUM(LENGTH(" + concatenated_columns + ")), 0) FROM " + table;
  fetch(res, one_col, one_row, query.c_str());
  return lexical_cast<unsigned long>(res[0][0]);
}

void
database::get_ids(string const & table, set< hexenc<id> > & ids)
{
  results res;
  string query = "SELECT id FROM " + table;
  fetch(res, one_col, any_rows, query.c_str());

  for (size_t i = 0; i < res.size(); ++i)
    {
      ids.insert(hexenc<id>(res[i][0]));
    }
}

void
database::get(hexenc<id> const & ident,
              data & dat,
              string const & table)
{
  results res;
  string query = "SELECT data FROM " + table + " WHERE id = ?";
  fetch(res, one_col, one_row, query.c_str(), ident().c_str());

  // consistency check
  base64<gzip<data> > rdata(res[0][0]);
  data rdata_unpacked;
  unpack(rdata, rdata_unpacked);

  hexenc<id> tid;
  calculate_ident(rdata_unpacked, tid);
  I(tid == ident);

  dat = rdata_unpacked;
}

void
database::get_delta(hexenc<id> const & ident,
                    hexenc<id> const & base,
                    delta & del,
                    string const & table)
{
  I(ident() != "");
  I(base() != "");
  results res;
  string query = "SELECT delta FROM " + table + " WHERE id = ? AND base = ?";
  fetch(res, one_col, one_row, query.c_str(),
        ident().c_str(), base().c_str());

  base64<gzip<delta> > del_packed = res[0][0];
  unpack(del_packed, del);
}

void
database::put(hexenc<id> const & ident,
              data const & dat,
              string const & table)
{
  // consistency check
  I(ident() != "");
  hexenc<id> tid;
  calculate_ident(dat, tid);
  I(tid == ident);

  base64<gzip<data> > dat_packed;
  pack(dat, dat_packed);

  string insert = "INSERT INTO " + table + " VALUES(?, ?)";
  execute(insert.c_str(),ident().c_str(), dat_packed().c_str());
}
void
database::put_delta(hexenc<id> const & ident,
                    hexenc<id> const & base,
                    delta const & del,
                    string const & table)
{
  // nb: delta schema is (id, base, delta)
  I(ident() != "");
  I(base() != "");

  base64<gzip<delta> > del_packed;
  pack(del, del_packed);

  string insert = "INSERT INTO "+table+" VALUES(?, ?, ?)";
  execute(insert.c_str(), ident().c_str(), base().c_str(), del_packed().c_str());
}

// static ticker cache_hits("vcache hits", "h", 1);

struct version_cache
{
  size_t capacity;
  size_t use;
  std::map<hexenc<id>, data> cache;

  version_cache() : capacity(constants::db_version_cache_sz), use(0)
  {
    srand(time(NULL));
  }

  void put(hexenc<id> const & ident, data const & dat)
  {
    while (!cache.empty()
           && use + dat().size() > capacity)
      {
        std::string key = (boost::format("%08.8x%08.8x%08.8x%08.8x%08.8x")
                           % rand() % rand() % rand() % rand() % rand()).str();
        std::map<hexenc<id>, data>::const_iterator i;
        i = cache.lower_bound(hexenc<id>(key));
        if (i == cache.end())
          {
            // we can't find a random entry, probably there's only one
            // entry and we missed it. delete first entry instead.
            i = cache.begin();
          }
        I(i != cache.end());
        I(use >= i->second().size());
        L(F("version cache expiring %s\n") % i->first);
        use -= i->second().size();
        cache.erase(i->first);
      }
    cache.insert(std::make_pair(ident, dat));
    use += dat().size();
  }

  bool exists(hexenc<id> const & ident)
  {
    std::map<hexenc<id>, data>::const_iterator i;
    i = cache.find(ident);
    return i != cache.end();
  }

  bool get(hexenc<id> const & ident, data & dat)
  {
    std::map<hexenc<id>, data>::const_iterator i;
    i = cache.find(ident);
    if (i == cache.end())
      return false;
    // ++cache_hits;
    L(F("version cache hit on %s\n") % ident);
    dat = i->second;
    return true;
  }
};

static version_cache vcache;

void
database::get_version(hexenc<id> const & ident,
                      data & dat,
                      string const & data_table,
                      string const & delta_table)
{
  I(ident() != "");

  if (vcache.get(ident, dat))
    {
      return;
    }
  else if (exists(ident, data_table))
    {
     // easy path
     get(ident, dat, data_table);
    }
  else
    {
      // tricky path

      // we start from the file we want to reconstruct and work *forwards*
      // through the database, until we get to a full data object. we then
      // trace back through the list of edges we followed to get to the data
      // object, applying reverse deltas.
      //
      // the effect of this algorithm is breadth-first search, backwards
      // through the storage graph, to discover a forwards shortest path, and
      // then following that shortest path with delta application.
      //
      // we used to do this with the boost graph library, but it invovled
      // loading too much of the storage graph into memory at any moment. this
      // imperative version only loads the descendents of the reconstruction
      // node, so it much cheaper in terms of memory.
      //
      // we also maintain a cycle-detecting set, just to be safe

      L(F("reconstructing %s in %s\n") % ident % delta_table);
      I(delta_exists(ident, delta_table));

      // nb: an edge map goes in the direction of the
      // delta, *not* the direction we discover things in,
      // i.e. each map is of the form [newid] -> [oldid]

      typedef map< hexenc<id>, hexenc<id> > edgemap;
      list< shared_ptr<edgemap> > paths;

      set< hexenc<id> > frontier, cycles;
      frontier.insert(ident);

      bool found_root = false;
      hexenc<id> root("");

      string delta_query = "SELECT base FROM " + delta_table + " WHERE id = ?";

      while (! found_root)
        {
          set< hexenc<id> > next_frontier;
          shared_ptr<edgemap> frontier_map(new edgemap());

          I(!frontier.empty());

          for (set< hexenc<id> >::const_iterator i = frontier.begin();
               i != frontier.end(); ++i)
            {
              if (vcache.exists(*i) || exists(*i, data_table))
                {
                  root = *i;
                  found_root = true;
                  break;
                }
              else
                {
                  cycles.insert(*i);
                  results res;

                  fetch(res, one_col, any_rows,
                        delta_query.c_str(), (*i)().c_str());

                  for (size_t k = 0; k < res.size(); ++k)
                    {
                      hexenc<id> const nxt(res[k][0]);

                      if (cycles.find(nxt) != cycles.end())
                        throw oops("cycle in table '" + delta_table + "', at node "
                                   + (*i)() + " <- " + nxt());

                      next_frontier.insert(nxt);

                      if (frontier_map->find(nxt) == frontier_map->end())
                        {
                          L(F("inserting edge: %s <- %s\n") % (*i) % nxt);
                          frontier_map->insert(make_pair(nxt, *i));
                        }
                      else
                        L(F("skipping merge edge %s <- %s\n") % (*i) % nxt);
                    }
                }
            }
          if (!found_root)
            {
              frontier = next_frontier;
              paths.push_front(frontier_map);
            }
        }

      // path built, now all we need to do is follow it back

      I(found_root);
      I(root() != "");
      data begin;

      if (vcache.exists(root))
        {
          I(vcache.get(root, begin));
        }
      else
        {
          get(root, begin, data_table);
        }

      hexenc<id> curr = root;

      boost::shared_ptr<delta_applicator> app = new_piecewise_applicator();
      app->begin(begin());

      for (list< shared_ptr<edgemap> >::const_iterator p = paths.begin();
           p != paths.end(); ++p)
        {
          shared_ptr<edgemap> i = *p;
          I(i->find(curr) != i->end());
          hexenc<id> const nxt = i->find(curr)->second;

          if (!vcache.exists(curr))
            {
              string tmp;
              app->finish(tmp);
              vcache.put(curr, tmp);
            }


          L(F("following delta %s -> %s\n") % curr % nxt);
          delta del;
          get_delta(nxt, curr, del, delta_table);
          apply_delta (app, del());

          app->next();
          curr = nxt;
        }

      string tmp;
      app->finish(tmp);
      dat = data(tmp);

      hexenc<id> final;
      calculate_ident(dat, final);
      I(final == ident);
    }
  vcache.put(ident, dat);
}


void
database::drop(hexenc<id> const & ident,
               string const & table)
{
  string drop = "DELETE FROM " + table + " WHERE id = ?";
  execute(drop.c_str(), ident().c_str());
}

void
database::put_version(hexenc<id> const & old_id,
                      hexenc<id> const & new_id,
                      delta const & del,
                      string const & data_table,
                      string const & delta_table)
{

  data old_data, new_data;
  delta reverse_delta;

  get_version(old_id, old_data, data_table, delta_table);
  patch(old_data, del, new_data);
  diff(new_data, old_data, reverse_delta);

  transaction_guard guard(*this);
  if (exists(old_id, data_table))
    {
      // descendent of a head version replaces the head, therefore old head
      // must be disposed of
      drop(old_id, data_table);
    }
  put(new_id, new_data, data_table);
  put_delta(old_id, new_id, reverse_delta, delta_table);
  guard.commit();
}

void
database::put_reverse_version(hexenc<id> const & new_id,
                              hexenc<id> const & old_id,
                              delta const & reverse_del,
                              string const & data_table,
                              string const & delta_table)
{
  data old_data, new_data;

  get_version(new_id, new_data, data_table, delta_table);
  patch(new_data, reverse_del, old_data);
  hexenc<id> check;
  calculate_ident(old_data, check);
  I(old_id == check);

  transaction_guard guard(*this);
  put_delta(old_id, new_id, reverse_del, delta_table);
  guard.commit();
}



// ------------------------------------------------------------
// --                                                        --
// --              public interface follows                  --
// --                                                        --
// ------------------------------------------------------------

bool
database::file_version_exists(file_id const & id)
{
  return delta_exists(id.inner(), "file_deltas")
    || exists(id.inner(), "files");
}

bool
database::manifest_version_exists(manifest_id const & id)
{
  return delta_exists(id.inner(), "manifest_deltas")
    || exists(id.inner(), "manifests");
}

bool
database::revision_exists(revision_id const & id)
{
  return exists(id.inner(), "revisions");
}

void
database::get_file_ids(set<file_id> & ids)
{
  ids.clear();
  set< hexenc<id> > tmp;
  get_ids("files", tmp);
  get_ids("file_deltas", tmp);
  ids.insert(tmp.begin(), tmp.end());
}

void
database::get_manifest_ids(set<manifest_id> & ids)
{
  ids.clear();
  set< hexenc<id> > tmp;
  get_ids("manifests", tmp);
  get_ids("manifest_deltas", tmp);
  ids.insert(tmp.begin(), tmp.end());
}

void
database::get_revision_ids(set<revision_id> & ids)
{
  ids.clear();
  set< hexenc<id> > tmp;
  get_ids("revisions", tmp);
  ids.insert(tmp.begin(), tmp.end());
}

void
database::get_file_version(file_id const & id,
                           file_data & dat)
{
  data tmp;
  get_version(id.inner(), tmp, "files", "file_deltas");
  dat = tmp;
}

void
database::get_manifest_version(manifest_id const & id,
                               manifest_data & dat)
{
  data tmp;
  get_version(id.inner(), tmp, "manifests", "manifest_deltas");
  dat = tmp;
}

void
database::get_manifest(manifest_id const & id,
                       manifest_map & mm)
{
  manifest_data mdat;
  get_manifest_version(id, mdat);
  read_manifest_map(mdat, mm);
}


void
database::put_file(file_id const & id,
                   file_data const & dat)