Below is the file 'database.cc' from this revision. You can also download the file.

// Copyright (C) 2002 Graydon Hoare <graydon@pobox.com>
//
// This program is made available under the GNU GPL version 2.0 or
// greater. See the accompanying file COPYING for details.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE.

#include "base.hh"
#include <algorithm>
#include <deque>
#include <fstream>
#include <iterator>
#include <list>
#include <set>
#include <sstream>
#include "vector.hh"

#include <string.h>

#include <boost/shared_ptr.hpp>
#include "lexical_cast.hh"

#include "sqlite/sqlite3.h"

#include "app_state.hh"
#include "cert.hh"
#include "cleanup.hh"
#include "constants.hh"
#include "dates.hh"
#include "database.hh"
#include "hash_map.hh"
#include "keys.hh"
#include "platform-wrapped.hh"
#include "revision.hh"
#include "safe_map.hh"
#include "sanity.hh"
#include "schema_migration.hh"
#include "transforms.hh"
#include "ui.hh"
#include "vocab.hh"
#include "vocab_cast.hh"
#include "xdelta.hh"
#include "epoch.hh"
#include "graph.hh"
#include "roster.hh"
#include "roster_delta.hh"
#include "rev_height.hh"
#include "vocab_hash.hh"
#include "globish.hh"
#include "work.hh"
#include "lua_hooks.hh"
#include "outdated_indicator.hh"
#include "lru_writeback_cache.hh"

#include "botan/botan.h"
#include "botan/rsa.h"
#include "botan/keypair.h"
#include "botan/pem.h"

// defined in schema.c, generated from schema.sql:
extern char const schema_constant[];

// this file defines a public, typed interface to the database.
// the database class encapsulates all knowledge about sqlite,
// the schema, and all SQL statements used to access the schema.
//
// see file schema.sql for the text of the schema.

using std::deque;
using std::istream;
using std::ifstream;
using std::make_pair;
using std::map;
using std::multimap;
using std::ostream;
using std::pair;
using std::set;
using std::string;
using std::vector;

using boost::shared_ptr;
using boost::shared_dynamic_cast;
using boost::lexical_cast;

using Botan::PK_Encryptor;
using Botan::PK_Verifier;
using Botan::SecureVector;
using Botan::X509_PublicKey;
using Botan::RSA_PublicKey;

int const one_row = 1;
int const one_col = 1;
int const any_rows = -1;
int const any_cols = -1;

namespace
{
  struct query_param
  {
    enum arg_type { text, blob };
    arg_type type;
    string data;
  };

  query_param
  text(string const & txt)
  {
    query_param q = {
      query_param::text,
      txt,
    };
    return q;
  }

  query_param
  blob(string const & blb)
  {
    query_param q = {
      query_param::blob,
      blb,
    };
    return q;
  }

  struct query
  {
    explicit query(string const & cmd)
      : sql_cmd(cmd)
    {}

    query()
    {}

    query & operator %(query_param const & qp)
    {
      args.push_back(qp);
      return *this;
    }

    vector<query_param> args;
    string sql_cmd;
  };

  typedef vector< vector<string> > results;

  struct statement
  {
    statement() : count(0), stmt(0, sqlite3_finalize) {}
    int count;
    cleanup_ptr<sqlite3_stmt*, int> stmt;
  };

  struct roster_size_estimator
  {
    unsigned long operator() (cached_roster const & cr)
    {
      I(cr.first);
      I(cr.second);
      // do estimate using a totally made up multiplier, probably wildly off
      return (cr.first->all_nodes().size()
              * constants::db_estimated_roster_node_sz);
    }
  };

  struct datasz
  {
    unsigned long operator()(data const & t) { return t().size(); }
  };

  enum open_mode { normal_mode = 0,
                   schema_bypass_mode,
                   format_bypass_mode };

  typedef hashmap::hash_map<revision_id, set<revision_id> > parent_id_map;
  typedef hashmap::hash_map<revision_id, rev_height> height_map;

  typedef hashmap::hash_map<rsa_keypair_id,
                            pair<shared_ptr<Botan::PK_Verifier>,
                                 shared_ptr<Botan::RSA_PublicKey> >
                            > verifier_cache;

} // anonymous namespace

class database_impl
{
  friend class database;

  // for scoped_ptr's sake
public:
  explicit database_impl(system_path const &);
  ~database_impl();

private:

  //
  // --== Opening the database and schema checking ==--
  //
  system_path const filename;
  struct sqlite3 * __sql;

  void install_functions();
  struct sqlite3 * sql(enum open_mode mode = normal_mode);

  void check_filename();
  void check_db_exists();
  void check_db_nonexistent();
  void open();
  void close();
  void check_format();

  //
  // --== Basic SQL interface and statement caching ==--
  //
  map<string, statement> statement_cache;

  void fetch(results & res,
             int const want_cols, int const want_rows,
             query const & q);
  void execute(query const & q);

  bool table_has_entry(id const & key, string const & column,
                       string const & table);

  //
  // --== Generic database metadata gathering ==--
  //
  string count(string const & table);
  string space(string const & table,
                    string const & concatenated_columns,
                    u64 & total);
  unsigned int page_size();
  unsigned int cache_size();

  //
  // --== Transactions ==--
  //
  int transaction_level;
  bool transaction_exclusive;
  void begin_transaction(bool exclusive);
  void commit_transaction();
  void rollback_transaction();
  friend class conditional_transaction_guard;

  struct roster_writeback_manager
  {
    database_impl & imp;
    roster_writeback_manager(database_impl & imp) : imp(imp) {}
    void writeout(revision_id const &, cached_roster const &);
  };
  LRUWritebackCache<revision_id, cached_roster,
                    roster_size_estimator, roster_writeback_manager>
    roster_cache;

  bool have_delayed_file(file_id const & id);
  void load_delayed_file(file_id const & id, file_data & dat);
  void cancel_delayed_file(file_id const & id);
  void drop_or_cancel_file(file_id const & id);
  void schedule_delayed_file(file_id const & id, file_data const & dat);

  map<file_id, file_data> delayed_files;
  size_t delayed_writes_size;

  void flush_delayed_writes();
  void clear_delayed_writes();
  void write_delayed_file(file_id const & new_id,
                          file_data const & dat);

  void write_delayed_roster(revision_id const & new_id,
                            roster_t const & roster,
                            marking_map const & marking);

  //
  // --== Reading/writing delta-compressed objects ==--
  //

  // "do we have any entry for 'ident' that is a base version"
  bool roster_base_stored(revision_id const & ident);
  bool roster_base_available(revision_id const & ident);

  // "do we have any entry for 'ident' that is a delta"
  bool delta_exists(file_id const & ident,
                    file_id const & base,
                    string const & table);

  bool file_or_manifest_base_exists(id const & ident,
                                    std::string const & table);

  void get_file_or_manifest_base_unchecked(id const & new_id,
                                           data & dat,
                                           string const & table);
  void get_file_or_manifest_delta_unchecked(id const & ident,
                                            id const & base,
                                            delta & del,
                                            string const & table);
  void get_roster_base(revision_id const & ident,
                       roster_t & roster, marking_map & marking);
  void get_roster_delta(id const & ident,
                        id const & base,
                        roster_delta & del);

  friend struct file_and_manifest_reconstruction_graph;
  friend struct roster_reconstruction_graph;

  LRUWritebackCache<id, data, datasz> vcache;

  void get_version(id const & ident,
                   data & dat,
                   string const & data_table,
                   string const & delta_table);

  void drop(id const & base,
            string const & table);
  void put_file_delta(file_id const & ident,
                      file_id const & base,
                      file_delta const & del);

  void put_roster_delta(revision_id const & ident,
                        revision_id const & base,
                        roster_delta const & del);
  void put_version(file_id const & old_id,
                   file_id const & new_id,
                   delta const & del,
                   string const & data_table,
                   string const & delta_table);

  //
  // --== The ancestry graph ==--
  //
  void get_ids(string const & table, set<id> & ids);

  //
  // --== Rosters ==--
  //
  struct extractor;
  struct file_content_extractor;
  struct markings_extractor;
  void extract_from_deltas(revision_id const & id, extractor & x);

  height_map height_cache;
  parent_id_map parent_cache;

  //
  // --== Keys ==--
  //
  void get_keys(string const & table, vector<rsa_keypair_id> & keys);

  // cache of verifiers for public keys
  verifier_cache verifiers;

  //
  // --== Certs ==--
  //
  // note: this section is ridiculous. please do something about it.
  bool cert_exists(cert const & t,
                   string const & table);
  void put_cert(cert const & t, string const & table);
  void results_to_certs(results const & res,
                        vector<cert> & certs);

  void get_certs(vector<cert> & certs,
                 string const & table);

  void get_certs(id const & ident,
                 vector<cert> & certs,
                 string const & table);

  void get_certs(cert_name const & name,
                 vector<cert> & certs,
                 string const & table);

  void get_certs(id const & ident,
                 cert_name const & name,
                 vector<cert> & certs,
                 string const & table);

  void get_certs(id const & ident,
                 cert_name const & name,
                 cert_value const & val,
                 vector<cert> & certs,
                 string const & table);

  void get_certs(cert_name const & name,
                 cert_value const & val,
                 vector<cert> & certs,
                 string const & table);

  outdated_indicator_factory cert_stamper;

  void add_prefix_matching_constraint(string const & colname,
                                      string const & prefix,
                                      query & q);
};

database_impl::database_impl(system_path const & f) :
  filename(f),
  __sql(NULL),
  transaction_level(0),
  roster_cache(constants::db_roster_cache_sz,
               roster_writeback_manager(*this)),
  delayed_writes_size(0),
  vcache(constants::db_version_cache_sz)
{}

database_impl::~database_impl()
{
  L(FL("statement cache statistics"));
  L(FL("prepared %d statements") % statement_cache.size());

  for (map<string, statement>::const_iterator i = statement_cache.begin();
       i != statement_cache.end(); ++i)
    L(FL("%d executions of %s") % i->second.count % i->first);
  // trigger destructors to finalize cached statements
  statement_cache.clear();

  if (__sql)
    close();
}

database::database(app_state & app)
  : imp(new database_impl(app.opts.dbname)), lua(app.lua)
{}

database::~database()
{}

system_path
database::get_filename()
{
  return imp->filename;
}

bool
database::is_dbfile(any_path const & file)
{
  system_path fn(file); // canonicalize
  bool same = (imp->filename == fn);
  if (same)
    L(FL("'%s' is the database file") % file);
  return same;
}

bool
database::database_specified()
{
  return !imp->filename.empty();
}

void
database::check_is_not_rosterified()
{
  results res;
  imp->fetch(res, one_col, any_rows,
             query("SELECT 1 FROM rosters LIMIT 1"));
  N(res.empty(),
    F("this database already contains rosters"));
}

void
database_impl::check_format()
{
  results res;

  // Check for manifests, revisions, rosters, and heights.
  fetch(res, one_col, any_rows, query("SELECT 1 FROM manifests LIMIT 1"));
  bool have_manifests = !res.empty();
  fetch(res, one_col, any_rows, query("SELECT 1 FROM revisions LIMIT 1"));
  bool have_revisions = !res.empty();
  fetch(res, one_col, any_rows, query("SELECT 1 FROM rosters LIMIT 1"));
  bool have_rosters = !res.empty();
  fetch(res, one_col, any_rows, query("SELECT 1 FROM heights LIMIT 1"));
  bool have_heights = !res.empty();


  if (!have_manifests)
    {
      // Must have been changesetified and rosterified already.
      // Or else the database was just created.
      // Do we need to regenerate cached data?
      E(!have_revisions || (have_rosters && have_heights),
        F("database %s lacks some cached data\n"
          "run '%s db regenerate_caches' to restore use of this database")
        % filename % ui.prog_name);
    }
  else
    {
      // The rosters and heights tables should be empty.
      I(!have_rosters && !have_heights);

      // they need to either changesetify or rosterify.  which?
      if (have_revisions)
        E(false,
          F("database %s contains old-style revisions\n"
            "if you are a project leader or doing local testing:\n"
            "  see the file UPGRADE for instructions on upgrading.\n"
            "if you are not a project leader:\n"
            "  wait for a leader to migrate project data, and then\n"
            "  pull into a fresh database.\n"
            "sorry about the inconvenience.")
          % filename);
      else
        E(false,
          F("database %s contains manifests but no revisions\n"
            "this is a very old database; it needs to be upgraded\n"
            "please see README.changesets for details")
          % filename);
    }
}

static void
sqlite3_gunzip_fn(sqlite3_context *f, int nargs, sqlite3_value ** args)
{
  if (nargs != 1)
    {
      sqlite3_result_error(f, "need exactly 1 arg to gunzip()", -1);
      return;
    }
  data unpacked;
  const char *val = (const char*) sqlite3_value_blob(args[0]);
  int bytes = sqlite3_value_bytes(args[0]);
  decode_gzip(gzip<data>(string(val,val+bytes)), unpacked);
  sqlite3_result_blob(f, unpacked().c_str(), unpacked().size(), SQLITE_TRANSIENT);
}

struct sqlite3 *
database_impl::sql(enum open_mode mode)
{
  if (! __sql)
    {
      check_filename();
      check_db_exists();
      open();

      if (mode != schema_bypass_mode)
        {
          check_sql_schema(__sql, filename);

          if (mode != format_bypass_mode)
            check_format();
        }

      install_functions();
    }
  else
    I(mode == normal_mode);

  return __sql;
}

void
database::initialize()
{
  imp->check_filename();
  imp->check_db_nonexistent();
  imp->open();

  sqlite3 *sql = imp->__sql;

  sqlite3_exec(sql, schema_constant, NULL, NULL, NULL);
  assert_sqlite3_ok(sql);

  sqlite3_exec(sql, (FL("PRAGMA user_version = %u;")
                     % mtn_creator_code).str().c_str(), NULL, NULL, NULL);
  assert_sqlite3_ok(sql);

  // make sure what we wanted is what we got
  check_sql_schema(sql, imp->filename);

  imp->close();
}

struct
dump_request
{
  dump_request() : sql(), out() {};
  struct sqlite3 *sql;
  ostream *out;
};

static void
dump_row(ostream &out, sqlite3_stmt *stmt, string const& table_name)
{
  out << FL("INSERT INTO %s VALUES(") % table_name;
  unsigned n = sqlite3_data_count(stmt);
  for (unsigned i = 0; i < n; ++i)
    {
      if (i != 0)
        out << ',';

      if (sqlite3_column_type(stmt, i) == SQLITE_BLOB)
        {
          out << "X'";
          const char *val = (const char*) sqlite3_column_blob(stmt, i);
          int bytes = sqlite3_column_bytes(stmt, i);
          out << encode_hexenc(string(val,val+bytes));
          out << '\'';
        }
      else
        {
          const unsigned char *val = sqlite3_column_text(stmt, i);
          if (val == NULL)
            out << "NULL";
          else
            {
              out << '\'';
              for (const unsigned char *cp = val; *cp; ++cp)
                {
                  if (*cp == '\'')
                    out << "''";
                  else
                    out << *cp;
                }
              out << '\'';
            }
        }
    }
  out << ");\n";
}

static int
dump_table_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(dump->sql != NULL);
  I(vals != NULL);
  I(vals[0] != NULL);
  I(vals[1] != NULL);
  I(vals[2] != NULL);
  I(n == 3);
  I(string(vals[1]) == "table");
  *(dump->out) << vals[2] << ";\n";
  string table_name(vals[0]);
  string query = "SELECT * FROM " + table_name;
  sqlite3_stmt *stmt = 0;
  sqlite3_prepare_v2(dump->sql, query.c_str(), -1, &stmt, NULL);
  assert_sqlite3_ok(dump->sql);

  int stepresult = SQLITE_DONE;
  do
    {
      stepresult = sqlite3_step(stmt);
      I(stepresult == SQLITE_DONE || stepresult == SQLITE_ROW);
      if (stepresult == SQLITE_ROW)
        dump_row(*(dump->out), stmt, table_name);
    }
  while (stepresult == SQLITE_ROW);

  sqlite3_finalize(stmt);
  assert_sqlite3_ok(dump->sql);
  return 0;
}

static int
dump_index_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(dump->sql != NULL);
  I(vals != NULL);
  I(vals[0] != NULL);
  I(vals[1] != NULL);
  I(vals[2] != NULL);
  I(n == 3);
  I(string(vals[1]) == "index");
  *(dump->out) << vals[2] << ";\n";
  return 0;
}

static int
dump_user_version_cb(void *data, int n, char **vals, char **cols)
{
  dump_request *dump = reinterpret_cast<dump_request *>(data);
  I(dump != NULL);
  I(dump->sql != NULL);
  I(vals != NULL);
  I(vals[0] != NULL);
  I(n == 1);
  *(dump->out) << "PRAGMA user_version = " << vals[0] << ";\n";
  return 0;
}

void
database::dump(ostream & out)
{
  ensure_open_for_maintenance();

  {
    transaction_guard guard(*this);
    dump_request req;
    req.out = &out;
    req.sql = imp->sql();
    out << "BEGIN EXCLUSIVE;\n";
    int res;
    res = sqlite3_exec(req.sql,
                          "SELECT name, type, sql FROM sqlite_master "
                          "WHERE type='table' AND sql NOT NULL "
                          "AND name not like 'sqlite_stat%' "
                          "ORDER BY name",
                          dump_table_cb, &req, NULL);
    assert_sqlite3_ok(req.sql);
    res = sqlite3_exec(req.sql,
                          "SELECT name, type, sql FROM sqlite_master "
                          "WHERE type='index' AND sql NOT NULL "
                          "ORDER BY name",
                          dump_index_cb, &req, NULL);
    assert_sqlite3_ok(req.sql);
    res = sqlite3_exec(req.sql,
                       "PRAGMA user_version;",
                       dump_user_version_cb, &req, NULL);
    assert_sqlite3_ok(req.sql);
    out << "COMMIT;\n";
    guard.commit();
  }
}

void
database::load(istream & in)
{
  string line;
  string sql_stmt;

  imp->check_filename();
  imp->check_db_nonexistent();
  imp->open();

  sqlite3 * sql = imp->__sql;

  // the page size can only be set before any other commands have been executed
  sqlite3_exec(sql, "PRAGMA page_size=8192", NULL, NULL, NULL);
  assert_sqlite3_ok(sql);

  while(in)
    {
      getline(in, line, ';');
      sql_stmt += line + ';';

      if (sqlite3_complete(sql_stmt.c_str()))
        {
          sqlite3_exec(sql, sql_stmt.c_str(), NULL, NULL, NULL);
          assert_sqlite3_ok(sql);
          sql_stmt.clear();
        }
    }

  assert_sqlite3_ok(sql);
}


void
database::debug(string const & sql, ostream & out)
{
  ensure_open_for_maintenance();

  results res;
  imp->fetch(res, any_cols, any_rows, query(sql));
  out << '\'' << sql << "' -> " << res.size() << " rows\n\n";
  for (size_t i = 0; i < res.size(); ++i)
    {
      for (size_t j = 0; j < res[i].size(); ++j)
        {
          if (j != 0)
            out << " | ";
          out << res[i][j];
        }
      out << '\n';
    }
}

// Subroutine of info().  This compares strings that might either be numbers
// or error messages surrounded by square brackets.  We want the longest
// number, even if there's an error message that's longer than that.
static bool longest_number(string a, string b)
{
  if(a.length() > 0 && a[0] == '[')
    return true;  // b is longer
  if(b.length() > 0 && b[0] == '[')
    return false; // a is longer

  return a.length() < b.length();
}

// Subroutine of info() and some things it calls.
// Given an informative_failure which is believed to represent an SQLite
// error, either return a string version of the error message (if it was an
// SQLite error) or rethrow the execption (if it wasn't).
static string
format_sqlite_error_for_info(informative_failure const & e)
{
  string err(e.what());
  string prefix = _("error: ");
  prefix.append(_("sqlite error: "));
  if (err.find(prefix) != 0)
    throw;

  err.replace(0, prefix.length(), "[");
  string::size_type nl = err.find('\n');
  if (nl != string::npos)
    err.erase(nl);

  err.append("]");
  return err;
}

// Subroutine of info().  Pretty-print the database's "creator code", which
// is a 32-bit unsigned number that we interpret as a four-character ASCII
// string, provided that all four characters are graphic.  (On disk, it's
// stored in the "user version" field of the database.)
static string
format_creator_code(u32 code)
{
  char buf[5];
  string result;

  if (code == 0)
    return _("not set");

  buf[4] = '\0';
  buf[3] = ((code & 0x000000ff) >>  0);
  buf[2] = ((code & 0x0000ff00) >>  8);
  buf[1] = ((code & 0x00ff0000) >> 16);
  buf[0] = ((code & 0xff000000) >> 24);

  if (isgraph(buf[0]) && isgraph(buf[1]) && isgraph(buf[2]) && isgraph(buf[3]))
    result = (FL("%s (0x%08x)") % buf % code).str();
  else
    result = (FL("0x%08x") % code).str();
  if (code != mtn_creator_code)
    result += _(" (not a monotone database)");
  return result;
}


void
database::info(ostream & out)
{
  // don't check the schema
  ensure_open_for_maintenance();

  // do a dummy query to confirm that the database file is an sqlite3
  // database.  (this doesn't happen on open() because sqlite postpones the
  // actual file open until the first access.  we can't piggyback it on the
  // query of the user version because there's a bug in sqlite 3.3.10:
  // the routine that reads meta-values from the database header does not
  // check the file format.  reported as sqlite bug #2182.)
  sqlite3_exec(imp->__sql, "SELECT 1 FROM sqlite_master LIMIT 0", 0, 0, 0);
  assert_sqlite3_ok(imp->__sql);

  u32 ccode;
  {
    results res;
    imp->fetch(res, one_col, one_row, query("PRAGMA user_version"));
    I(res.size() == 1);
    ccode = lexical_cast<u32>(res[0][0]);
  }

  vector<string> counts;
  counts.push_back(imp->count("rosters"));
  counts.push_back(imp->count("roster_deltas"));
  counts.push_back(imp->count("files"));
  counts.push_back(imp->count("file_deltas"));
  counts.push_back(imp->count("revisions"));
  counts.push_back(imp->count("revision_ancestry"));
  counts.push_back(imp->count("revision_certs"));

  {
    results res;
    try
      {
        imp->fetch(res, one_col, any_rows,
              query("SELECT node FROM next_roster_node_number"));
        if (res.empty())
          counts.push_back("0");
        else
          {
            I(res.size() == 1);
            u64 n = lexical_cast<u64>(res[0][0]) - 1;
            counts.push_back((F("%u") % n).str());
          }
      }
    catch (informative_failure const & e)
      {
        counts.push_back(format_sqlite_error_for_info(e));
      }
  }

  vector<string> bytes;
  {
    u64 total = 0;
    bytes.push_back(imp->space("rosters",
                          "length(id) + length(checksum) + length(data)",
                          total));
    bytes.push_back(imp->space("roster_deltas",
                          "length(id) + length(checksum)"
                          "+ length(base) + length(delta)", total));
    bytes.push_back(imp->space("files", "length(id) + length(data)", total));
    bytes.push_back(imp->space("file_deltas",
                          "length(id) + length(base) + length(delta)", total));
    bytes.push_back(imp->space("revisions", "length(id) + length(data)", total));
    bytes.push_back(imp->space("revision_ancestry",
                          "length(parent) + length(child)", total));
    bytes.push_back(imp->space("revision_certs",
                          "length(hash) + length(id) + length(name)"
                          "+ length(value) + length(keypair)"
                          "+ length(signature)", total));
    bytes.push_back(imp->space("heights", "length(revision) + length(height)",
                          total));
    bytes.push_back((F("%u") % total).str());
  }

  // pad each vector's strings on the left with spaces to make them all the
  // same length
  {
    string::size_type width
      = max_element(counts.begin(), counts.end(), longest_number)->length();
    for(vector<string>::iterator i = counts.begin(); i != counts.end(); i++)
      if (width > i->length() && (*i)[0] != '[')
        i->insert(0, width - i->length(), ' ');

    width = max_element(bytes.begin(), bytes.end(), longest_number)->length();
    for(vector<string>::iterator i = bytes.begin(); i != bytes.end(); i++)
      if (width > i->length() && (*i)[0] != '[')
        i->insert(0, width - i->length(), ' ');
  }

  i18n_format form =
    F("creator code      : %s\n"
      "schema version    : %s\n"
      "counts:\n"
      "  full rosters    : %s\n"
      "  roster deltas   : %s\n"
      "  full files      : %s\n"
      "  file deltas     : %s\n"
      "  revisions       : %s\n"
      "  ancestry edges  : %s\n"
      "  certs           : %s\n"
      "  logical files   : %s\n"
      "bytes:\n"
      "  full rosters    : %s\n"
      "  roster deltas   : %s\n"
      "  full files      : %s\n"
      "  file deltas     : %s\n"
      "  revisions       : %s\n"
      "  cached ancestry : %s\n"
      "  certs           : %s\n"
      "  heights         : %s\n"
      "  total           : %s\n"
      "database:\n"
      "  page size       : %s\n"
      "  cache size      : %s"
      );

  form = form % format_creator_code(ccode);
  form = form % describe_sql_schema(imp->__sql);

  for (vector<string>::iterator i = counts.begin(); i != counts.end(); i++)
    form = form % *i;

  for (vector<string>::iterator i = bytes.begin(); i != bytes.end(); i++)
    form = form % *i;

  form = form % imp->page_size();
  form = form % imp->cache_size();

  out << form.str() << '\n'; // final newline is kept out of the translation
}

void
database::version(ostream & out)
{
  ensure_open_for_maintenance();
  out << (F("database schema version: %s")
          % describe_sql_schema(imp->__sql)).str()
      << '\n';
}

void
database::migrate(key_store & keys)
{
  ensure_open_for_maintenance();
  migrate_sql_schema(imp->__sql, keys, get_filename());
}

void
database::test_migration_step(key_store & keys, string const & schema)
{
  ensure_open_for_maintenance();
  ::test_migration_step(imp->__sql, keys, get_filename(), schema);
}

void
database::ensure_open()
{
  imp->sql();
}

void
database::ensure_open_for_format_changes()
{
  imp->sql(format_bypass_mode);
}

void
database::ensure_open_for_maintenance()
{
  imp->sql(schema_bypass_mode);
}

void
database_impl::execute(query const & query)
{
  results res;
  fetch(res, 0, 0, query);
}

void
database_impl::fetch(results & res,
                      int const want_cols,
                      int const want_rows,
                      query const & query)
{
  int nrow;
  int ncol;
  int rescode;

  res.clear();
  res.resize(0);

  map<string, statement>::iterator i = statement_cache.find(query.sql_cmd);
  if (i == statement_cache.end())
    {
      statement_cache.insert(make_pair(query.sql_cmd, statement()));
      i = statement_cache.find(query.sql_cmd);
      I(i != statement_cache.end());

      const char * tail;
      sqlite3_prepare_v2(sql(), query.sql_cmd.c_str(), -1, i->second.stmt.paddr(), &tail);
      assert_sqlite3_ok(sql());
      L(FL("prepared statement %s") % query.sql_cmd);

      // no support for multiple statements here
      E(*tail == 0,
        F("multiple statements in query: %s") % query.sql_cmd);
    }

  ncol = sqlite3_column_count(i->second.stmt());

  E(want_cols == any_cols || want_cols == ncol,
    F("wanted %d columns got %d in query: %s") % want_cols % ncol % query.sql_cmd);

  // bind parameters for this execution

  int params = sqlite3_bind_parameter_count(i->second.stmt());

  // Ensure that exactly the right number of parameters were given
  I(params == int(query.args.size()));

  L(FL("binding %d parameters for %s") % params % query.sql_cmd);

  for (int param = 1; param <= params; param++)
    {
      // profiling finds this logging to be quite expensive
      if (global_sanity.debug_p())
        {
          string prefix;
          string log(query.args[param-1].data);

          if (query.args[param-1].type == query_param::blob)
            {
              prefix = "x";
              log = encode_hexenc(log);
            }

          if (log.size() > constants::db_log_line_sz)
            log = log.substr(0, constants::db_log_line_sz - 2) + "..";

          L(FL("binding %d with value '%s'") % param % log);
        }

      switch (idx(query.args, param - 1).type)
        {
        case query_param::text:
          sqlite3_bind_text(i->second.stmt(), param,
                            idx(query.args, param - 1).data.c_str(), -1,
                            SQLITE_STATIC);
          break;
        case query_param::blob:
          {
            string const & data = idx(query.args, param - 1).data;
            sqlite3_bind_blob(i->second.stmt(), param,
                              data.data(), data.size(),
                              SQLITE_STATIC);
          }
          break;
        default:
          I(false);
        }

      assert_sqlite3_ok(sql());
    }

  // execute and process results

  nrow = 0;
  for (rescode = sqlite3_step(i->second.stmt()); rescode == SQLITE_ROW;
       rescode = sqlite3_step(i->second.stmt()))
    {
      vector<string> row;
      for (int col = 0; col < ncol; col++)
        {
          const char * value = (const char*)sqlite3_column_blob(i->second.stmt(), col);
          int bytes = sqlite3_column_bytes(i->second.stmt(), col);
          E(value, F("null result in query: %s") % query.sql_cmd);
          row.push_back(string(value, value + bytes));
          //L(FL("row %d col %d value='%s'") % nrow % col % value);
        }
      res.push_back(row);
    }

  if (rescode != SQLITE_DONE)
    assert_sqlite3_ok(sql());

  sqlite3_reset(i->second.stmt());
  assert_sqlite3_ok(sql());

  nrow = res.size();

  i->second.count++;

  E(want_rows == any_rows || want_rows == nrow,
    F("wanted %d rows got %d in query: %s") % want_rows % nrow % query.sql_cmd);
}

bool
database_impl::table_has_entry(id const & key,
                               std::string const & column,
                               std::string const & table)
{
  results res;
  query q("SELECT 1 FROM " + table + " WHERE " + column + " = ? LIMIT 1");
  fetch(res, one_col, any_rows, q % blob(key()));
  return !res.empty();
}

// general application-level logic

void
database_impl::begin_transaction(bool exclusive)
{
  if (transaction_level == 0)
    {
      I(delayed_files.empty());
      I(roster_cache.all_clean());
      if (exclusive)
        execute(query("BEGIN EXCLUSIVE"));
      else
        execute(query("BEGIN DEFERRED"));
      transaction_exclusive = exclusive;
    }
  else
    {
      // You can't start an exclusive transaction within a non-exclusive
      // transaction
      I(!exclusive || transaction_exclusive);
    }
  transaction_level++;
}


static size_t
size_delayed_file(file_id const & id, file_data const & dat)
{
  return id.inner()().size() + dat.inner()().size();
}

bool
database_impl::have_delayed_file(file_id const & id)
{
  return delayed_files.find(id) != delayed_files.end();
}

void
database_impl::load_delayed_file(file_id const & id, file_data & dat)
{
  dat = safe_get(delayed_files, id);
}

// precondition: have_delayed_file(an_id) == true
void
database_impl::cancel_delayed_file(file_id const & an_id)
{
  file_data const & dat = safe_get(delayed_files, an_id);
  size_t cancel_size = size_delayed_file(an_id, dat);
  I(cancel_size <= delayed_writes_size);
  delayed_writes_size -= cancel_size;

  safe_erase(delayed_files, an_id);
}

void
database_impl::drop_or_cancel_file(file_id const & id)
{
  if (have_delayed_file(id))
    cancel_delayed_file(id);
  else
    drop(id.inner(), "files");
}

void
database_impl::schedule_delayed_file(file_id const & an_id,
                                      file_data const & dat)
{
  if (!have_delayed_file(an_id))
    {
      safe_insert(delayed_files, make_pair(an_id, dat));
      delayed_writes_size += size_delayed_file(an_id, dat);
    }
  if (delayed_writes_size > constants::db_max_delayed_file_bytes)
    flush_delayed_writes