Below is the file 'database.cc' from this revision. You can also download the file.
// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil; c-basic-offset: 2 -*- // vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: // copyright (C) 2002, 2003 graydon hoare <graydon@pobox.com> // copyright (C) 2006 vinzenz feenstra <evilissimo@c-plusplus.de> // all rights reserved. // licensed to the public under the terms of the GNU GPL (>= 2) // see the file COPYING for details #include <algorithm> #include <deque> #include <fstream> #include <iterator> #include <list> #include <set> #include <sstream> #include <vector> #include <string.h> #include <boost/shared_ptr.hpp> #include <boost/lexical_cast.hpp> #include <sqlite3.h> #include "app_state.hh" #include "cert.hh" #include "cleanup.hh" #include "constants.hh" #include "database.hh" #include "hash_map.hh" #include "keys.hh" #include "sanity.hh" #include "schema_migration.hh" #include "transforms.hh" #include "ui.hh" #include "vocab.hh" #include "xdelta.hh" #include "epoch.hh" #include "revision.hh" // defined in schema.sql, converted to header: #include "schema.h" // defined in views.sql, converted to header: #include "views.h" // this file defines a public, typed interface to the database. // the database class encapsulates all knowledge about sqlite, // the schema, and all SQL statements used to access the schema. // // see file schema.sql for the text of the schema. using boost::shared_ptr; using boost::lexical_cast; using namespace std; int const one_row = 1; int const one_col = 1; int const any_rows = -1; int const any_cols = -1; namespace { struct query_param { enum arg_type { text, blob }; arg_type type; std::string data; }; query_param text(std::string const & txt) { query_param q = { query_param::text, txt, }; return q; } query_param blob(std::string const & blb) { query_param q = { query_param::blob, blb, }; return q; } // track all open databases for close_all_databases() handler set<sqlite3*> sql_contexts; } struct query { query(std::string const & cmd) : sql_cmd(cmd) {} query & operator %(query_param const & qp) { args.push_back(qp); return *this; } std::vector<query_param> args; std::string sql_cmd; }; extern "C" { // some wrappers to ease migration const char *sqlite3_value_text_s(sqlite3_value *v); const char *sqlite3_column_text_s(sqlite3_stmt*, int col); } database::database(system_path const & fn) : filename(fn), // nb. update this if you change the schema. unfortunately we are not // using self-digesting schemas due to comment irregularities and // non-alphabetic ordering of tables in sql source files. we could create // a temporary db, write our intended schema into it, and read it back, // but this seems like it would be too rude. possibly revisit this issue. schema("1db80c7cee8fa966913db1a463ed50bf1b0e5b0e"), __sql(NULL), transaction_level(0) {} void database::check_schema() { string db_schema_id; calculate_schema_id (__sql, db_schema_id); N (schema == db_schema_id, F("layout of database %s doesn't match this version of monotone\n" "wanted schema %s, got %s\n" "try 'monotone db migrate' to upgrade\n" "(this is irreversible; you may want to make a backup copy first)") % filename % schema % db_schema_id); } void database::check_format() { results res_revisions; string manifests_query = "SELECT 1 FROM manifests LIMIT 1"; string revisions_query = "SELECT 1 FROM revisions LIMIT 1"; string rosters_query = "SELECT 1 FROM rosters LIMIT 1"; fetch(res_revisions, one_col, any_rows, query(revisions_query)); if (res_revisions.size() > 0) { // they have revisions, so they can't be _ancient_, but they still might // not have rosters results res_rosters; fetch(res_rosters, one_col, any_rows, query(rosters_query)); N(res_rosters.size() != 0, F("database %s contains revisions but no rosters\n" "if you are a project leader or doing local testing:\n" " see the file UPGRADE for instructions on upgrading.\n" "if you are not a project leader:\n" " wait for a leader to migrate project data, and then\n" " pull into a fresh database.\n" "sorry about the inconvenience.") % filename); } else { // they have no revisions, so they shouldn't have any manifests either. // if they do, their db is probably ancient. (though I guess you could // trigger this check by taking a pre-roster monotone, doing "db // init; commit; db kill_rev_locally", and then upgrading to a // rosterified monotone.) results res_manifests; fetch(res_manifests, one_col, any_rows, query(manifests_query)); N(res_manifests.size() == 0, F("database %s contains manifests but no revisions\n" "this is a very old database; it needs to be upgraded\n" "please see README.changesets for details") % filename); } } // sqlite3_value_text gives a const unsigned char * but most of the time // we need a signed char const char * sqlite3_value_text_s(sqlite3_value *v) { return (const char *)(sqlite3_value_text(v)); } const char * sqlite3_column_text_s(sqlite3_stmt *stmt, int col) { return (const char *)(sqlite3_column_text(stmt, col)); } static void sqlite3_unbase64_fn(sqlite3_context *f, int nargs, sqlite3_value ** args) { if (nargs != 1) { sqlite3_result_error(f, "need exactly 1 arg to unbase64()", -1); return; } data decoded; decode_base64(base64<data>(string(sqlite3_value_text_s(args[0]))), decoded); sqlite3_result_blob(f, decoded().c_str(), decoded().size(), SQLITE_TRANSIENT); } static void sqlite3_unpack_fn(sqlite3_context *f, int nargs, sqlite3_value ** args) { if (nargs != 1) { sqlite3_result_error(f, "need exactly 1 arg to unpack()", -1); return; } data unpacked; unpack(base64< gzip<data> >(string(sqlite3_value_text_s(args[0]))), unpacked); sqlite3_result_blob(f, unpacked().c_str(), unpacked().size(), SQLITE_TRANSIENT); } void database::set_app(app_state * app) { __app = app; } static void check_sqlite_format_version(system_path const & filename) { // sqlite 3 files begin with this constant string // (version 2 files begin with a different one) std::string version_string("SQLite format 3"); std::ifstream file(filename.as_external().c_str()); N(file, F("unable to probe database version in file %s") % filename); for (std::string::const_iterator i = version_string.begin(); i != version_string.end(); ++i) { char c; file.get(c); N(c == *i, F("database %s is not an sqlite version 3 file, " "try dump and reload") % filename); } } static void assert_sqlite3_ok(sqlite3 *s) { int errcode = sqlite3_errcode(s); if (errcode == SQLITE_OK) return; const char * errmsg = sqlite3_errmsg(s); // sometimes sqlite is not very helpful // so we keep a table of errors people have gotten and more helpful versions if (errcode != SQLITE_OK) { // first log the code so we can find _out_ what the confusing code // was... note that code does not uniquely identify the errmsg, unlike // errno's. L(FL("sqlite error: %d: %s") % errcode % errmsg); } std::string auxiliary_message = ""; if (errcode == SQLITE_ERROR) { auxiliary_message += _("make sure database and containing directory are writeable\n" "and you have not run out of disk space"); } // if the last message is empty, the \n will be stripped off too E(errcode == SQLITE_OK, // kind of string surgery to avoid ~duplicate strings F("sqlite error: %d: %s\n%s") % errcode % errmsg % auxiliary_message); } struct sqlite3 * database::sql(bool init, bool migrating_format) { if (! __sql) { check_filename(); if (! init) { check_db_exists(); check_sqlite_format_version(filename); } open(); if (init) { sqlite3_exec(__sql, schema_constant, NULL, NULL, NULL); assert_sqlite3_ok(__sql); } check_schema(); install_functions(__app); install_views(); if (!migrating_format) check_format(); } else { I(!init); I(!migrating_format); } return __sql; } void database::initialize() { if (__sql) throw oops("cannot initialize database while it is open"); require_path_is_nonexistent(filename, F("could not initialize database: %s: already exists") % filename); system_path journal(filename.as_internal() + "-journal"); require_path_is_nonexistent(journal, F("existing (possibly stale) journal file '%s' " "has same stem as new database '%s'\n" "cancelling database creation") % journal % filename); sqlite3 *s = sql(true); I(s != NULL); } struct dump_request { dump_request() {}; struct sqlite3 *sql; string table_name; ostream *out; }; static int dump_row_cb(void *data, int n, char **vals, char **cols) { dump_request *dump = reinterpret_cast<dump_request *>(data); I(dump != NULL); I(vals != NULL); I(dump->out != NULL); *(dump->out) << boost::format("INSERT INTO %s VALUES(") % dump->table_name; for (int i = 0; i < n; ++i) { if (i != 0) *(dump->out) << ','; if (vals[i] == NULL) *(dump->out) << "NULL"; else { *(dump->out) << "'"; for (char *cp = vals[i]; *cp; ++cp) { if (*cp == '\'') *(dump->out) << "''"; else *(dump->out) << *cp; } *(dump->out) << "'"; } } *(dump->out) << ");\n"; return 0; } static int dump_table_cb(void *data, int n, char **vals, char **cols) { dump_request *dump = reinterpret_cast<dump_request *>(data); I(dump != NULL); I(dump->sql != NULL); I(vals != NULL); I(vals[0] != NULL); I(vals[1] != NULL); I(vals[2] != NULL); I(n == 3); I(string(vals[1]) == "table"); *(dump->out) << vals[2] << ";\n"; dump->table_name = string(vals[0]); string query = "SELECT * FROM " + string(vals[0]); sqlite3_exec(dump->sql, query.c_str(), dump_row_cb, data, NULL); assert_sqlite3_ok(dump->sql); return 0; } static int dump_index_cb(void *data, int n, char **vals, char **cols) { dump_request *dump = reinterpret_cast<dump_request *>(data); I(dump != NULL); I(dump->sql != NULL); I(vals != NULL); I(vals[0] != NULL); I(vals[1] != NULL); I(vals[2] != NULL); I(n == 3); I(string(vals[1]) == "index"); *(dump->out) << vals[2] << ";\n"; return 0; } void database::dump(ostream & out) { transaction_guard guard(*this); dump_request req; req.out = &out; req.sql = sql(); out << "BEGIN EXCLUSIVE;\n"; int res; res = sqlite3_exec(req.sql, "SELECT name, type, sql FROM sqlite_master " "WHERE type='table' AND sql NOT NULL " "AND name not like 'sqlite_stat%' " "ORDER BY name", dump_table_cb, &req, NULL); assert_sqlite3_ok(req.sql); res = sqlite3_exec(req.sql, "SELECT name, type, sql FROM sqlite_master " "WHERE type='index' AND sql NOT NULL " "ORDER BY name", dump_index_cb, &req, NULL); assert_sqlite3_ok(req.sql); out << "COMMIT;\n"; guard.commit(); } void database::load(istream & in) { string line; string sql_stmt; check_filename(); require_path_is_nonexistent(filename, F("cannot create %s; it already exists") % filename); open(); while(in) { getline(in, line, ';'); sql_stmt += line + ';'; if (sqlite3_complete(sql_stmt.c_str())) { sqlite3_exec(__sql, sql_stmt.c_str(), NULL, NULL, NULL); sql_stmt.clear(); } } assert_sqlite3_ok(__sql); } void database::debug(string const & sql, ostream & out) { results res; fetch(res, any_cols, any_rows, query(sql)); out << "'" << sql << "' -> " << res.size() << " rows\n" << endl; for (size_t i = 0; i < res.size(); ++i) { for (size_t j = 0; j < res[i].size(); ++j) { if (j != 0) out << " | "; out << res[i][j]; } out << endl; } } namespace { unsigned long add(unsigned long count, unsigned long & total) { total += count; return count; } } void database::info(ostream & out) { string id; calculate_schema_id(sql(), id); unsigned long total = 0UL; #define SPACE_USAGE(TABLE, COLS) add(space_usage(TABLE, COLS), total) out << \ F("schema version : %s\n" "counts:\n" " full rosters : %u\n" " roster deltas : %u\n" " full files : %u\n" " file deltas : %u\n" " revisions : %u\n" " ancestry edges : %u\n" " certs : %u\n" "bytes:\n" " full rosters : %u\n" " roster deltas : %u\n" " full files : %u\n" " file deltas : %u\n" " revisions : %u\n" " cached ancestry : %u\n" " certs : %u\n" " total : %u\n" ) % id // counts % count("rosters") % count("roster_deltas") % count("files") % count("file_deltas") % count("revisions") % count("revision_ancestry") % count("revision_certs") // bytes % SPACE_USAGE("rosters", "id || data") % SPACE_USAGE("roster_deltas", "id || base || delta") % SPACE_USAGE("files", "id || data") % SPACE_USAGE("file_deltas", "id || base || delta") % SPACE_USAGE("revisions", "id || data") % SPACE_USAGE("revision_ancestry", "parent || child") % SPACE_USAGE("revision_certs", "hash || id || name || value || keypair || signature") % total; #undef SPACE_USAGE } void database::version(ostream & out) { string id; check_filename(); check_db_exists(); open(); calculate_schema_id(__sql, id); close(); out << F("database schema version: %s") % id << endl; } void database::migrate() { check_filename(); check_db_exists(); open(); migrate_monotone_schema(__sql, __app); close(); } void database::ensure_open() { sqlite3 *s = sql(); I(s != NULL); } void database::ensure_open_for_format_changes() { sqlite3 *s = sql(false, true); I(s != NULL); } database::~database() { L(FL("statement cache statistics\n")); L(FL("prepared %d statements\n") % statement_cache.size()); for (map<string, statement>::const_iterator i = statement_cache.begin(); i != statement_cache.end(); ++i) L(FL("%d executions of %s\n") % i->second.count % i->first); // trigger destructors to finalize cached statements statement_cache.clear(); close(); } void database::execute(query const & query) { results res; fetch(res, 0, 0, query ); } void database::fetch(results & res, int const want_cols, int const want_rows, query const & query) { int nrow; int ncol; int rescode; res.clear(); res.resize(0); map<string, statement>::iterator i = statement_cache.find(query.sql_cmd); if (i == statement_cache.end()) { statement_cache.insert(make_pair(query.sql_cmd, statement())); i = statement_cache.find(query.sql_cmd); I(i != statement_cache.end()); const char * tail; sqlite3_prepare(sql(), query.sql_cmd.c_str(), -1, i->second.stmt.paddr(), &tail); assert_sqlite3_ok(sql()); L(FL("prepared statement %s\n") % query.sql_cmd); // no support for multiple statements here E(*tail == 0, F("multiple statements in query: %s\n") % query.sql_cmd); } ncol = sqlite3_column_count(i->second.stmt()); E(want_cols == any_cols || want_cols == ncol, F("wanted %d columns got %d in query: %s\n") % want_cols % ncol % query.sql_cmd); // bind parameters for this execution int params = sqlite3_bind_parameter_count(i->second.stmt()); // Ensure that exactly the right number of parameters were given I(params == int(query.args.size())); // profiling finds this logging to be quite expensive if (global_sanity.debug) L(FL("binding %d parameters for %s\n") % params % query.sql_cmd); for (int param = 1; param <= params; param++) { // profiling finds this logging to be quite expensive if (global_sanity.debug) { string log = query.args[param-1].data; if (log.size() > constants::log_line_sz) log = log.substr(0, constants::log_line_sz); L(FL("binding %d with value '%s'\n") % param % log); } switch (idx(query.args, param - 1).type) { case query_param::text: sqlite3_bind_text(i->second.stmt(), param, idx(query.args, param - 1).data.c_str(), -1, SQLITE_STATIC); break; case query_param::blob: { std::string const & data = idx(query.args, param - 1).data; sqlite3_bind_blob(i->second.stmt(), param, data.data(), data.size(), SQLITE_STATIC); } break; default: I(false); } assert_sqlite3_ok(sql()); } // execute and process results nrow = 0; for (rescode = sqlite3_step(i->second.stmt()); rescode == SQLITE_ROW; rescode = sqlite3_step(i->second.stmt())) { vector<string> row; for (int col = 0; col < ncol; col++) { const char * value = sqlite3_column_text_s(i->second.stmt(), col); E(value, F("null result in query: %s\n") % query.sql_cmd); row.push_back(value); //L(FL("row %d col %d value='%s'\n") % nrow % col % value); } res.push_back(row); } if (rescode != SQLITE_DONE) assert_sqlite3_ok(sql()); sqlite3_reset(i->second.stmt()); assert_sqlite3_ok(sql()); nrow = res.size(); i->second.count++; E(want_rows == any_rows || want_rows == nrow, F("wanted %d rows got %s in query: %s\n") % want_rows % nrow % query.sql_cmd); } // general application-level logic void database::set_filename(system_path const & file) { I(!__sql); filename = file; } void database::begin_transaction(bool exclusive) { if (transaction_level == 0) { if (exclusive) execute(query("BEGIN EXCLUSIVE")); else execute(query("BEGIN DEFERRED")); transaction_exclusive = exclusive; } else { // You can't start an exclusive transaction within a non-exclusive // transaction I(!exclusive || transaction_exclusive); } transaction_level++; } void database::commit_transaction() { if (transaction_level == 1) execute(query("COMMIT")); transaction_level--; } void database::rollback_transaction() { if (transaction_level == 1) execute(query("ROLLBACK")); transaction_level--; } bool database::exists(hexenc<id> const & ident, string const & table) { results res; query q("SELECT id FROM " + table + " WHERE id = ?"); fetch(res, one_col, any_rows, q % text(ident())); if (res.size() > 1) { for (results::const_iterator i = res.begin(); i != res.end(); i++) { L(FL("%s") % (*i)[0]); } } I((res.size() == 1) || (res.size() == 0)); return res.size() == 1; } bool database::delta_exists(hexenc<id> const & ident, string const & table) { results res; query q("SELECT id FROM " + table + " WHERE id = ?"); fetch(res, one_col, any_rows, q % text(ident())); return res.size() > 0; } unsigned long database::count(string const & table) { results res; query q("SELECT COUNT(*) FROM " + table); fetch(res, one_col, one_row, q); return lexical_cast<unsigned long>(res[0][0]); } unsigned long database::space_usage(string const & table, string const & concatenated_columns) { results res; // COALESCE is required since SUM({empty set}) is NULL. // the sqlite docs for SUM suggest this as a workaround query q("SELECT COALESCE(SUM(LENGTH(" + concatenated_columns + ")), 0) FROM " + table); fetch(res, one_col, one_row, q); return lexical_cast<unsigned long>(res[0][0]); } void database::get_ids(string const & table, set< hexenc<id> > & ids) { results res; query q("SELECT id FROM " + table); fetch(res, one_col, any_rows, q); for (size_t i = 0; i < res.size(); ++i) { ids.insert(hexenc<id>(res[i][0])); } } void database::get(hexenc<id> const & ident, data & dat, string const & table) { results res; query q("SELECT data FROM " + table + " WHERE id = ?"); fetch(res, one_col, one_row, q % text(ident())); // consistency check base64<gzip<data> > rdata(res[0][0]); data rdata_unpacked; unpack(rdata, rdata_unpacked); hexenc<id> tid; calculate_ident(rdata_unpacked, tid); I(tid == ident); dat = rdata_unpacked; } void database::get_delta(hexenc<id> const & ident, hexenc<id> const & base, delta & del, string const & table) { I(ident() != ""); I(base() != ""); results res; query q("SELECT delta FROM " + table + " WHERE id = ? AND base = ?"); fetch(res, one_col, one_row, q % text(ident()) % text(base())); base64<gzip<delta> > del_packed = res[0][0]; unpack(del_packed, del); } void database::put(hexenc<id> const & ident, data const & dat, string const & table) { // consistency check I(ident() != ""); hexenc<id> tid; calculate_ident(dat, tid); MM(ident); MM(tid); I(tid == ident); base64<gzip<data> > dat_packed; pack(dat, dat_packed); string insert = "INSERT INTO " + table + " VALUES(?, ?)"; execute(query(insert) % text(ident()) % text(dat_packed())); } void database::put_delta(hexenc<id> const & ident, hexenc<id> const & base, delta const & del, string const & table) { // nb: delta schema is (id, base, delta) I(ident() != ""); I(base() != ""); base64<gzip<delta> > del_packed; pack(del, del_packed); string insert = "INSERT INTO "+table+" VALUES(?, ?, ?)"; execute(query(insert) % text(ident()) % text(base()) % text(del_packed())); } // static ticker cache_hits("vcache hits", "h", 1); struct version_cache { size_t capacity; size_t use; std::map<hexenc<id>, data> cache; version_cache() : capacity(constants::db_version_cache_sz), use(0) { srand(time(NULL)); } void put(hexenc<id> const & ident, data const & dat) { while (!cache.empty() && use + dat().size() > capacity) { std::string key = (boost::format("%08.8x%08.8x%08.8x%08.8x%08.8x") % rand() % rand() % rand() % rand() % rand()).str(); std::map<hexenc<id>, data>::const_iterator i; i = cache.lower_bound(hexenc<id>(key)); if (i == cache.end()) { // we can't find a random entry, probably there's only one // entry and we missed it. delete first entry instead. i = cache.begin(); } I(i != cache.end()); I(use >= i->second().size()); //L(FL("version cache expiring %s\n") % i->first); use -= i->second().size(); cache.erase(i->first); } cache.insert(std::make_pair(ident, dat)); use += dat().size(); } bool exists(hexenc<id> const & ident) { std::map<hexenc<id>, data>::const_iterator i; i = cache.find(ident); return i != cache.end(); } bool get(hexenc<id> const & ident, data & dat) { std::map<hexenc<id>, data>::const_iterator i; i = cache.find(ident); if (i == cache.end()) return false; // ++cache_hits; //L(FL("version cache hit on %s\n") % ident); dat = i->second; return true; } }; static version_cache vcache; typedef vector< hexenc<id> > version_path; static void extend_path_if_not_cycle(string table_name, shared_ptr<version_path> p, hexenc<id> const & ext, set< hexenc<id> > & seen_nodes, vector< shared_ptr<version_path> > & next_paths) { for (version_path::const_iterator i = p->begin(); i != p->end(); ++i) { if ((*i)() == ext()) throw oops("cycle in table '" + table_name + "', at node " + (*i)() + " <- " + ext()); } if (seen_nodes.find(ext) == seen_nodes.end()) { p->push_back(ext); next_paths.push_back(p); seen_nodes.insert(ext); } } void database::get_version(hexenc<id> const & ident, data & dat, string const & data_table, string const & delta_table) { I(ident() != ""); if (vcache.get(ident, dat)) { return; } else if (exists(ident, data_table)) { // easy path get(ident, dat, data_table); } else { // tricky path // we start from the file we want to reconstruct and work *forwards* // through the database, until we get to a full data object. we then // trace back through the list of edges we followed to get to the data // object, applying reverse deltas. // // the effect of this algorithm is breadth-first search, backwards // through the storage graph, to discover a forwards shortest path, and // then following that shortest path with delta application. // // we used to do this with the boost graph library, but it invovled // loading too much of the storage graph into memory at any moment. this // imperative version only loads the descendents of the reconstruction // node, so it much cheaper in terms of memory. // // we also maintain a cycle-detecting set, just to be safe L(FL("reconstructing %s in %s\n") % ident % delta_table); I(delta_exists(ident, delta_table)); // Our reconstruction algorithm involves keeping a set of parallel // linear paths, starting from ident, moving forward through the // storage DAG until we hit a storage root. // // On each iteration, we extend every active path by one step. If our // extension involves a fork, we duplicate the path. If any path // contains a cycle, we fault. // // If, by extending a path C, we enter a node which another path // D has already seen, we kill path C. This avoids the possibility of // exponential growth in the number of paths due to extensive forking // and merging. vector< shared_ptr<version_path> > live_paths; string delta_query = "SELECT base FROM " + delta_table + " WHERE id = ?"; { shared_ptr<version_path> pth0 = shared_ptr<version_path>(new version_path()); pth0->push_back(ident); live_paths.push_back(pth0); } shared_ptr<version_path> selected_path; set< hexenc<id> > seen_nodes; while (!selected_path) { vector< shared_ptr<version_path> > next_paths; for (vector<shared_ptr<version_path> >::const_iterator i = live_paths.begin(); i != live_paths.end(); ++i) { shared_ptr<version_path> pth = *i; hexenc<id> tip = pth->back(); if (vcache.exists(tip) || exists(tip, data_table)) { selected_path = pth; break; } else { // This tip is not a root, so extend the path. results res; fetch(res, one_col, any_rows, query(delta_query) % text(tip())); I(res.size() != 0); // Replicate the path if there's a fork. for (size_t k = 1; k < res.size(); ++k) { shared_ptr<version_path> pthN = shared_ptr<version_path>(new version_path(*pth)); extend_path_if_not_cycle(delta_table, pthN, hexenc<id>(res[k][0]), seen_nodes, next_paths); } // And extend the base path we're examining. extend_path_if_not_cycle(delta_table, pth, hexenc<id>(res[0][0]), seen_nodes, next_paths); } } I(selected_path || !next_paths.empty()); live_paths = next_paths; } // Found a root, now trace it back along the path. I(selected_path); I(selected_path->size() > 1); hexenc<id> curr = selected_path->back(); selected_path->pop_back(); data begin; if (vcache.exists(curr)) { I(vcache.get(curr, begin)); } else { get(curr, begin, data_table); } boost::shared_ptr<delta_applicator> app = new_piecewise_applicator(); app->begin(begin()); for (version_path::reverse_iterator i = selected_path->rbegin(); i != selected_path->rend(); ++i) { hexenc<id> const nxt = *i; if (!vcache.exists(curr)) { string tmp; app->finish(tmp); vcache.put(curr, tmp); } L(FL("following delta %s -> %s\n") % curr % nxt); delta del; get_delta(nxt, curr, del, delta_table); apply_delta (app, del()); app->next(); curr = nxt; } string tmp; app->finish(tmp); dat = data(tmp); hexenc<id> final; calculate_ident(dat, final); I(final == ident); } vcache.put(ident, dat); } void database::drop(hexenc<id> const & ident, string const & table) { string drop = "DELETE FROM " + table + " WHERE id = ?"; execute(query(drop) % text(ident())); } // insert the given new data using old_id as a hint // as to ancestry void database::put_version(hexenc<id> const & old_id, hexenc<id> const & new_id, data const & new_dat, string const & data_table, string const & delta_table) { transaction_guard guard(*this); if (exists(new_id, data_table) || exists(new_id, delta_table)) return; hexenc<id> base_id; MM(base_id); if (exists(old_id, data_table)) { base_id = old_id; } else { // XXX: this relies on single step deltas, should probably be something // more like get_version()'s path following. string delta_query = "SELECT base FROM " + delta_table + " WHERE id = ?"; results res; fetch(res, one_col, any_rows, query(delta_query) % text(old_id())); I(res.size() != 0); base_id = hexenc<id>(res[0][0]); I(exists(base_id, data_table)); } data base_dat; get(base_id, base_dat, data_table); delta del; diff(base_dat, new_dat, del); static ticker full("full", "f", 1); static ticker against("ag", "g", 1); // TODO: size comparison stuff. if (del().size() < 0.15 * new_dat().size()) { ++against; L(FL("put_version del %s -> %s (%s)") % base_id % new_id % delta_table); put_delta(new_id, base_id, del, delta_table); } else { ++full; L(FL("put_version dat %s (%s)") % new_id % data_table); put(new_id, new_dat, data_table); } guard.commit(); } void database::remove_version(hexenc<id> const & target_id, string const & data_table, string const & delta_table) { E(false, F("needs updating for against-base")); // We have a one of two cases (for multiple 'older' nodes): // // 1. pre: older <- target <- newer // post: older <- newer // // 2. pre: older <- target (a root) // post: older (a root) // // In case 1 we want to build new deltas bypassing the target we're // removing. In case 2 we just promote the older object to a root. transaction_guard guard(*this); I(exists(target_id, data_table) || delta_exists(target_id, delta_table)); map<hexenc<id>, data> older; { results res; query q("SELECT id FROM " + delta_table + " WHERE base = ?"); fetch(res, one_col, any_rows, q % text(target_id())); for (size_t i = 0; i < res.size(); ++i) { hexenc<id> old_id(res[i][0]); data old_data; get_version(old_id, old_data, data_table, delta_table); older.insert(make_pair(old_id, old_data)); } } if (delta_exists(target_id, delta_table