Below is the file 'database.cc' from this revision. You can also download the file.
// Copyright (C) 2002 Graydon Hoare <graydon@pobox.com> // // This program is made available under the GNU GPL version 2.0 or // greater. See the accompanying file COPYING for details. // // This program is distributed WITHOUT ANY WARRANTY; without even the // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // PURPOSE. #include <algorithm> #include <deque> #include <fstream> #include <iterator> #include <list> #include <set> #include <sstream> #include <vector> #include <string.h> #include <boost/shared_ptr.hpp> #include <boost/lexical_cast.hpp> #include <sqlite3.h> #include "app_state.hh" #include "cert.hh" #include "cleanup.hh" #include "constants.hh" #include "database.hh" #include "hash_map.hh" #include "keys.hh" #include "revision.hh" #include "safe_map.hh" #include "sanity.hh" #include "schema_migration.hh" #include "transforms.hh" #include "ui.hh" #include "vocab.hh" #include "xdelta.hh" #include "epoch.hh" #include "hash_map.hh" #undef _REENTRANT #include "lru_cache.h" // defined in schema.sql, converted to header: #include "schema.h" // this file defines a public, typed interface to the database. // the database class encapsulates all knowledge about sqlite, // the schema, and all SQL statements used to access the schema. // // see file schema.sql for the text of the schema. using std::deque; using std::endl; using std::istream; using std::ifstream; using std::make_pair; using std::map; using std::multimap; using std::ostream; using std::pair; using std::set; using std::string; using std::vector; using boost::shared_ptr; using boost::lexical_cast; using hashmap::hash_set; int const one_row = 1; int const one_col = 1; int const any_rows = -1; int const any_cols = -1; namespace { struct query_param { enum arg_type { text, blob }; arg_type type; string data; }; query_param text(string const & txt) { query_param q = { query_param::text, txt, }; return q; } query_param blob(string const & blb) { query_param q = { query_param::blob, blb, }; return q; } // track all open databases for close_all_databases() handler set<sqlite3*> sql_contexts; } struct query { explicit query(string const & cmd) : sql_cmd(cmd) {} query() {} query & operator %(query_param const & qp) { args.push_back(qp); return *this; } vector<query_param> args; string sql_cmd; }; database::database(system_path const & fn) : filename(fn), // nb. update this if you change the schema. unfortunately we are not // using self-digesting schemas due to comment irregularities and // non-alphabetic ordering of tables in sql source files. we could create // a temporary db, write our intended schema into it, and read it back, // but this seems like it would be too rude. possibly revisit this issue. schema("9d2b5d7b86df00c30ac34fe87a3c20f1195bb2df"), pending_writes_size(0), __sql(NULL), transaction_level(0) {} bool database::is_dbfile(any_path const & file) { system_path fn(file);// why is this needed? bool same = (filename.as_internal() == fn.as_internal()); if (same) L(FL("'%s' is the database file") % file); return same; } void database::check_schema() { string db_schema_id; calculate_schema_id (__sql, db_schema_id); N (schema == db_schema_id, F("layout of database %s doesn't match this version of monotone\n" "wanted schema %s, got %s\n" "try '%s db migrate' to upgrade\n" "(this is irreversible; you may want to make a backup copy first)") % filename % schema % db_schema_id % ui.prog_name); } void database::check_is_not_rosterified() { results res; string rosters_query = "SELECT 1 FROM rosters LIMIT 1"; fetch(res, one_col, any_rows, query(rosters_query)); N(res.empty(), F("this database already contains rosters")); } void database::check_format() { results res_revisions; string manifests_query = "SELECT 1 FROM manifests LIMIT 1"; string revisions_query = "SELECT 1 FROM revisions LIMIT 1"; string rosters_query = "SELECT 1 FROM rosters LIMIT 1"; fetch(res_revisions, one_col, any_rows, query(revisions_query)); if (!res_revisions.empty()) { // they have revisions, so they can't be _ancient_, but they still might // not have rosters results res_rosters; fetch(res_rosters, one_col, any_rows, query(rosters_query)); N(!res_rosters.empty(), F("database %s contains revisions but no rosters\n" "if you are a project leader or doing local testing:\n" " see the file UPGRADE for instructions on upgrading.\n" "if you are not a project leader:\n" " wait for a leader to migrate project data, and then\n" " pull into a fresh database.\n" "sorry about the inconvenience.") % filename); } else { // they have no revisions, so they shouldn't have any manifests either. // if they do, their db is probably ancient. (though I guess you could // trigger this check by taking a pre-roster monotone, doing "db // init; commit; db kill_rev_locally", and then upgrading to a // rosterified monotone.) results res_manifests; fetch(res_manifests, one_col, any_rows, query(manifests_query)); N(res_manifests.empty(), F("database %s contains manifests but no revisions\n" "this is a very old database; it needs to be upgraded\n" "please see README.changesets for details") % filename); } } static void sqlite3_gunzip_fn(sqlite3_context *f, int nargs, sqlite3_value ** args) { if (nargs != 1) { sqlite3_result_error(f, "need exactly 1 arg to gunzip()", -1); return; } data unpacked; const char *val = (const char*) sqlite3_value_blob(args[0]); int bytes = sqlite3_value_bytes(args[0]); decode_gzip(gzip<data>(string(val,val+bytes)), unpacked); sqlite3_result_blob(f, unpacked().c_str(), unpacked().size(), SQLITE_TRANSIENT); } void database::set_app(app_state * app) { __app = app; } static void check_sqlite_format_version(system_path const & filename) { // sqlite 3 files begin with this constant string // (version 2 files begin with a different one) string version_string("SQLite format 3"); ifstream file(filename.as_external().c_str()); N(file, F("unable to probe database version in file %s") % filename); for (string::const_iterator i = version_string.begin(); i != version_string.end(); ++i) { char c; file.get(c); N(c == *i, F("database %s is not an sqlite version 3 file, " "try dump and reload") % filename); } } static void assert_sqlite3_ok(sqlite3 *s) { int errcode = sqlite3_errcode(s); if (errcode == SQLITE_OK) return; const char * errmsg = sqlite3_errmsg(s); // sometimes sqlite is not very helpful // so we keep a table of errors people have gotten and more helpful versions if (errcode != SQLITE_OK) { // first log the code so we can find _out_ what the confusing code // was... note that code does not uniquely identify the errmsg, unlike // errno's. L(FL("sqlite error: %d: %s") % errcode % errmsg); } // note: if you update this, try to keep calculate_schema_id() in // schema_migration.cc consistent. string auxiliary_message = ""; if (errcode == SQLITE_ERROR) { auxiliary_message += _("make sure database and containing directory are writeable\n" "and you have not run out of disk space"); } // if the last message is empty, the \n will be stripped off too E(errcode == SQLITE_OK, // kind of string surgery to avoid ~duplicate strings F("sqlite error: %s\n%s") % errmsg % auxiliary_message); } struct sqlite3 * database::sql(bool init, bool migrating_format) { if (! __sql) { check_filename(); if (! init) { check_db_exists(); check_sqlite_format_version(filename); } open(); if (init) { sqlite3_exec(__sql, schema_constant, NULL, NULL, NULL); assert_sqlite3_ok(__sql); } check_schema(); install_functions(__app); if (!migrating_format) check_format(); } else { I(!init); I(!migrating_format); } return __sql; } void database::initialize() { if (__sql) throw oops("cannot initialize database while it is open"); require_path_is_nonexistent(filename, F("could not initialize database: %s: already exists") % filename); system_path journal(filename.as_internal() + "-journal"); require_path_is_nonexistent(journal, F("existing (possibly stale) journal file '%s' " "has same stem as new database '%s'\n" "cancelling database creation") % journal % filename); sqlite3 *s = sql(true); I(s != NULL); } struct dump_request { dump_request() : sql(), out() {}; struct sqlite3 *sql; ostream *out; }; static void dump_row(ostream &out, sqlite3_stmt *stmt, string const& table_name) { out << FL("INSERT INTO %s VALUES(") % table_name; unsigned n = sqlite3_data_count(stmt); for (unsigned i = 0; i < n; ++i) { if (i != 0) out << ','; if (sqlite3_column_type(stmt, i) == SQLITE_BLOB) { out << "X'"; const char *val = (const char*) sqlite3_column_blob(stmt, i); int bytes = sqlite3_column_bytes(stmt, i); out << encode_hexenc(string(val,val+bytes)); out << "'"; } else { const unsigned char *val = sqlite3_column_text(stmt, i); if (val == NULL) out << "NULL"; else { out << "'"; for (const unsigned char *cp = val; *cp; ++cp) { if (*cp == '\'') out << "''"; else out << *cp; } out << "'"; } } } out << ");\n"; } static int dump_table_cb(void *data, int n, char **vals, char **cols) { dump_request *dump = reinterpret_cast<dump_request *>(data); I(dump != NULL); I(dump->sql != NULL); I(vals != NULL); I(vals[0] != NULL); I(vals[1] != NULL); I(vals[2] != NULL); I(n == 3); I(string(vals[1]) == "table"); *(dump->out) << vals[2] << ";\n"; string table_name(vals[0]); string query = "SELECT * FROM " + table_name; sqlite3_stmt *stmt = 0; sqlite3_prepare(dump->sql, query.c_str(), -1, &stmt, NULL); assert_sqlite3_ok(dump->sql); int stepresult = SQLITE_DONE; do { stepresult = sqlite3_step(stmt); I(stepresult == SQLITE_DONE || stepresult == SQLITE_ROW); if (stepresult == SQLITE_ROW) dump_row(*(dump->out), stmt, table_name); } while (stepresult == SQLITE_ROW); sqlite3_finalize(stmt); assert_sqlite3_ok(dump->sql); return 0; } static int dump_index_cb(void *data, int n, char **vals, char **cols) { dump_request *dump = reinterpret_cast<dump_request *>(data); I(dump != NULL); I(dump->sql != NULL); I(vals != NULL); I(vals[0] != NULL); I(vals[1] != NULL); I(vals[2] != NULL); I(n == 3); I(string(vals[1]) == "index"); *(dump->out) << vals[2] << ";\n"; return 0; } void database::dump(ostream & out) { // don't care about schema checking etc. check_filename(); check_db_exists(); open(); { transaction_guard guard(*this); dump_request req; req.out = &out; req.sql = sql(); out << "BEGIN EXCLUSIVE;\n"; int res; res = sqlite3_exec(req.sql, "SELECT name, type, sql FROM sqlite_master " "WHERE type='table' AND sql NOT NULL " "AND name not like 'sqlite_stat%' " "ORDER BY name", dump_table_cb, &req, NULL); assert_sqlite3_ok(req.sql); res = sqlite3_exec(req.sql, "SELECT name, type, sql FROM sqlite_master " "WHERE type='index' AND sql NOT NULL " "ORDER BY name", dump_index_cb, &req, NULL); assert_sqlite3_ok(req.sql); out << "COMMIT;\n"; guard.commit(); } close(); } void database::load(istream & in) { string line; string sql_stmt; check_filename(); require_path_is_nonexistent(filename, F("cannot create %s; it already exists") % filename); open(); // the page size can only be set before any other commands have been executed sqlite3_exec(__sql, "PRAGMA page_size=8192", NULL, NULL, NULL); while(in) { getline(in, line, ';'); sql_stmt += line + ';'; if (sqlite3_complete(sql_stmt.c_str())) { sqlite3_exec(__sql, sql_stmt.c_str(), NULL, NULL, NULL); sql_stmt.clear(); } } assert_sqlite3_ok(__sql); execute(query("ANALYZE")); } void database::debug(string const & sql, ostream & out) { results res; fetch(res, any_cols, any_rows, query(sql)); out << "'" << sql << "' -> " << res.size() << " rows\n" << endl; for (size_t i = 0; i < res.size(); ++i) { for (size_t j = 0; j < res[i].size(); ++j) { if (j != 0) out << " | "; out << res[i][j]; } out << endl; } } namespace { unsigned long add(unsigned long count, unsigned long & total) { total += count; return count; } } void database::info(ostream & out) { string id; calculate_schema_id(sql(), id); unsigned long total = 0UL; u64 num_nodes; { results res; fetch(res, one_col, any_rows, query("SELECT node FROM next_roster_node_number")); if (res.empty()) num_nodes = 0; else { I(res.size() == 1); num_nodes = lexical_cast<u64>(res[0][0]) - 1; } } #define SPACE_USAGE(TABLE, COLS) add(space_usage(TABLE, COLS), total) out << \ F("schema version : %s\n" "counts:\n" " full rosters : %u\n" " roster deltas : %u\n" " full files : %u\n" " file deltas : %u\n" " revisions : %u\n" " ancestry edges : %u\n" " certs : %u\n" " logical files : %u\n" "bytes:\n" " full rosters : %u\n" " roster deltas : %u\n" " full files : %u\n" " file deltas : %u\n" " revisions : %u\n" " cached ancestry : %u\n" " certs : %u\n" " total : %u\n" "database:\n" " page size : %u\n" " cache size : %u\n" ) % id // counts % count("rosters") % count("roster_deltas") % count("files") % count("file_deltas") % count("revisions") % count("revision_ancestry") % count("revision_certs") % num_nodes // bytes % SPACE_USAGE("rosters", "length(id) + length(data)") % SPACE_USAGE("roster_deltas", "length(id) + length(base) + length(delta)") % SPACE_USAGE("files", "length(id) + length(data)") % SPACE_USAGE("file_deltas", "length(id) + length(base) + length(delta)") % SPACE_USAGE("revisions", "length(id) + length(data)") % SPACE_USAGE("revision_ancestry", "length(parent) + length(child)") % SPACE_USAGE("revision_certs", "length(hash) + length(id) + length(name)" " + length(value) + length(keypair) + length(signature)") % total % page_size() % cache_size(); #undef SPACE_USAGE } void database::version(ostream & out) { string id; check_filename(); check_db_exists(); open(); calculate_schema_id(__sql, id); close(); out << F("database schema version: %s") % id << endl; } void database::migrate() { check_filename(); check_db_exists(); open(); migrate_monotone_schema(__sql, __app); close(); } void database::ensure_open() { sqlite3 *s = sql(); I(s != NULL); } void database::ensure_open_for_format_changes() { sqlite3 *s = sql(false, true); I(s != NULL); } database::~database() { L(FL("statement cache statistics")); L(FL("prepared %d statements") % statement_cache.size()); for (map<string, statement>::const_iterator i = statement_cache.begin(); i != statement_cache.end(); ++i) L(FL("%d executions of %s") % i->second.count % i->first); // trigger destructors to finalize cached statements statement_cache.clear(); close(); } void database::execute(query const & query) { results res; fetch(res, 0, 0, query); } void database::fetch(results & res, int const want_cols, int const want_rows, query const & query) { int nrow; int ncol; int rescode; res.clear(); res.resize(0); map<string, statement>::iterator i = statement_cache.find(query.sql_cmd); if (i == statement_cache.end()) { statement_cache.insert(make_pair(query.sql_cmd, statement())); i = statement_cache.find(query.sql_cmd); I(i != statement_cache.end()); const char * tail; sqlite3_prepare(sql(), query.sql_cmd.c_str(), -1, i->second.stmt.paddr(), &tail); assert_sqlite3_ok(sql()); L(FL("prepared statement %s") % query.sql_cmd); // no support for multiple statements here E(*tail == 0, F("multiple statements in query: %s\n") % query.sql_cmd); } ncol = sqlite3_column_count(i->second.stmt()); E(want_cols == any_cols || want_cols == ncol, F("wanted %d columns got %d in query: %s") % want_cols % ncol % query.sql_cmd); // bind parameters for this execution int params = sqlite3_bind_parameter_count(i->second.stmt()); // Ensure that exactly the right number of parameters were given I(params == int(query.args.size())); // profiling finds this logging to be quite expensive if (global_sanity.debug) L(FL("binding %d parameters for %s") % params % query.sql_cmd); for (int param = 1; param <= params; param++) { // profiling finds this logging to be quite expensive if (global_sanity.debug) { string log = query.args[param-1].data; if (log.size() > constants::log_line_sz) log = log.substr(0, constants::log_line_sz); L(FL("binding %d with value '%s'") % param % log); } switch (idx(query.args, param - 1).type) { case query_param::text: sqlite3_bind_text(i->second.stmt(), param, idx(query.args, param - 1).data.c_str(), -1, SQLITE_STATIC); break; case query_param::blob: { string const & data = idx(query.args, param - 1).data; sqlite3_bind_blob(i->second.stmt(), param, data.data(), data.size(), SQLITE_STATIC); } break; default: I(false); } assert_sqlite3_ok(sql()); } // execute and process results nrow = 0; for (rescode = sqlite3_step(i->second.stmt()); rescode == SQLITE_ROW; rescode = sqlite3_step(i->second.stmt())) { vector<string> row; for (int col = 0; col < ncol; col++) { const char * value = (const char*)sqlite3_column_blob(i->second.stmt(), col); int bytes = sqlite3_column_bytes(i->second.stmt(), col); E(value, F("null result in query: %s") % query.sql_cmd); row.push_back(string(value, value + bytes)); //L(FL("row %d col %d value='%s'") % nrow % col % value); } res.push_back(row); } if (rescode != SQLITE_DONE) assert_sqlite3_ok(sql()); sqlite3_reset(i->second.stmt()); assert_sqlite3_ok(sql()); nrow = res.size(); i->second.count++; E(want_rows == any_rows || want_rows == nrow, F("wanted %d rows got %d in query: %s") % want_rows % nrow % query.sql_cmd); } // general application-level logic void database::set_filename(system_path const & file) { I(!__sql); filename = file; } system_path database::get_filename() { return filename; } void database::begin_transaction(bool exclusive) { if (transaction_level == 0) { I(pending_writes.empty()); if (exclusive) execute(query("BEGIN EXCLUSIVE")); else execute(query("BEGIN DEFERRED")); transaction_exclusive = exclusive; } else { // You can't start an exclusive transaction within a non-exclusive // transaction I(!exclusive || transaction_exclusive); } transaction_level++; } size_t database::size_pending_write(std::string const & tab, hexenc<id> const & id, data const & dat) { return tab.size() + id().size() + dat().size(); } bool database::have_pending_write(string const & tab, hexenc<id> const & id) { return pending_writes.find(make_pair(tab, id)) != pending_writes.end(); } void database::load_pending_write(string const & tab, hexenc<id> const & id, data & dat) { dat = safe_get(pending_writes, make_pair(tab, id)); } // precondition: have_pending_write(tab, an_id) == true void database::cancel_pending_write(string const & tab, hexenc<id> const & an_id) { data const & dat = safe_get(pending_writes, make_pair(tab, an_id)); size_t cancel_size = size_pending_write(tab, an_id, dat); I(cancel_size < pending_writes_size); pending_writes_size -= cancel_size; safe_erase(pending_writes, make_pair(tab, an_id)); } void database::schedule_write(string const & tab, hexenc<id> const & an_id, data const & dat) { if (!have_pending_write(tab, an_id)) { safe_insert(pending_writes, make_pair(make_pair(tab, an_id), dat)); pending_writes_size += size_pending_write(tab, an_id, dat); } if (pending_writes_size > constants::db_max_pending_writes_bytes) flush_pending_writes(); } void database::flush_pending_writes() { for (map<pair<string, hexenc<id> >, data>::const_iterator i = pending_writes.begin(); i != pending_writes.end(); ++i) put(i->first.second, i->second, i->first.first); pending_writes.clear(); pending_writes_size = 0; } void database::commit_transaction() { if (transaction_level == 1) { flush_pending_writes(); execute(query("COMMIT")); } transaction_level--; } void database::rollback_transaction() { if (transaction_level == 1) { pending_writes.clear(); pending_writes_size = 0; execute(query("ROLLBACK")); } transaction_level--; } bool database::exists(hexenc<id> const & ident, string const & table) { if (have_pending_write(table, ident)) return true; results res; query q("SELECT id FROM " + table + " WHERE id = ?"); fetch(res, one_col, any_rows, q % text(ident())); I((res.size() == 1) || (res.size() == 0)); return res.size() == 1; } bool database::delta_exists(hexenc<id> const & ident, string const & table) { results res; query q("SELECT id FROM " + table + " WHERE id = ?"); fetch(res, one_col, any_rows, q % text(ident())); return res.size() > 0; } unsigned long database::count(string const & table) { results res; query q("SELECT COUNT(*) FROM " + table); fetch(res, one_col, one_row, q); return lexical_cast<unsigned long>(res[0][0]); } unsigned long database::space_usage(string const & table, string const & rowspace) { results res; // COALESCE is required since SUM({empty set}) is NULL. // the sqlite docs for SUM suggest this as a workaround query q("SELECT COALESCE(SUM(" + rowspace + "), 0) FROM " + table); fetch(res, one_col, one_row, q); return lexical_cast<unsigned long>(res[0][0]); } unsigned int database::page_size() { results res; query q("PRAGMA page_size"); fetch(res, one_col, one_row, q); return lexical_cast<unsigned int>(res[0][0]); } unsigned int database::cache_size() { // This returns the persistent (default) cache size. It's possible to // override this setting transiently at runtime by setting PRAGMA // cache_size. results res; query q("PRAGMA default_cache_size"); fetch(res, one_col, one_row, q); return lexical_cast<unsigned int>(res[0][0]); } void database::get_ids(string const & table, set< hexenc<id> > & ids) { results res; query q("SELECT id FROM " + table); fetch(res, one_col, any_rows, q); for (size_t i = 0; i < res.size(); ++i) { ids.insert(hexenc<id>(res[i][0])); } } void database::get(hexenc<id> const & ident, data & dat, string const & table) { if (have_pending_write(table, ident)) { load_pending_write(table, ident, dat); return; } results res; query q("SELECT data FROM " + table + " WHERE id = ?"); fetch(res, one_col, one_row, q % text(ident())); // consistency check gzip<data> rdata(res[0][0]); data rdata_unpacked; decode_gzip(rdata,rdata_unpacked); hexenc<id> tid; calculate_ident(rdata_unpacked, tid); I(tid == ident); dat = rdata_unpacked; } void database::get_delta(hexenc<id> const & ident, hexenc<id> const & base, delta & del, string const & table) { I(ident() != ""); I(base() != ""); results res; query q("SELECT delta FROM " + table + " WHERE id = ? AND base = ?"); fetch(res, one_col, one_row, q % text(ident()) % text(base())); gzip<delta> del_packed(res[0][0]); decode_gzip(del_packed, del); } void database::put(hexenc<id> const & ident, data const & dat, string const & table) { // consistency check I(ident() != ""); hexenc<id> tid; calculate_ident(dat, tid); MM(ident); MM(tid); I(tid == ident); gzip<data> dat_packed; encode_gzip(dat, dat_packed); string insert = "INSERT INTO " + table + " VALUES(?, ?)"; execute(query(insert) % text(ident()) % blob(dat_packed())); } void database::put_delta(hexenc<id> const & ident, hexenc<id> const & base, delta const & del, string const & table) { // nb: delta schema is (id, base, delta) I(ident() != ""); I(base() != ""); gzip<delta> del_packed; encode_gzip(del, del_packed); string insert = "INSERT INTO "+table+" VALUES(?, ?, ?)"; execute(query(insert) % text(ident()) % text(base()) % blob(del_packed())); } // static ticker cache_hits("vcache hits", "h", 1); struct datasz { unsigned long operator()(data const & t) { return t().size(); } }; static LRUCache<hexenc<id>, data, datasz> vcache(constants::db_version_cache_sz); typedef vector< hexenc<id> > version_path; static void extend_path_if_not_cycle(string table_name, shared_ptr<version_path> p, hexenc<id> const & ext, set< hexenc<id> > & seen_nodes, vector< shared_ptr<version_path> > & next_paths) { for (version_path::const_iterator i = p->begin(); i != p->end(); ++i) { if ((*i)() == ext()) throw oops("cycle in table '" + table_name + "', at node " + (*i)() + " <- " + ext()); } if (seen_nodes.find(ext) == seen_nodes.end()) { p->push_back(ext); next_paths.push_back(p); seen_nodes.insert(ext); } } void database::get_version(hexenc<id> const & ident, data & dat, string const & data_table, string const & delta_table) { I(ident() != ""); if (vcache.fetch(ident, dat)) { return; } else if (exists(ident, data_table)) { // easy path get(ident, dat, data_table); } else { // tricky path // we start from the file we want to reconstruct and work *forwards* // through the database, until we get to a full data object. we then // trace back through the list of edges we followed to get to the data // object, applying reverse deltas. // // the effect of this algorithm is breadth-first search, backwards // through the storage graph, to discover a forwards shortest path, and // then following that shortest path with delta application. // // we used to do this with the boost graph library, but it invovled // loading too much of the storage graph into memory at any moment. this // imperative version only loads the descendents of the reconstruction // node, so it much cheaper in terms of memory. // // we also maintain a cycle-detecting set, just to be safe L(FL("reconstructing %s in %s") % ident % delta_table); I(delta_exists(ident, delta_table)); // Our reconstruction algorithm involves keeping a set of parallel // linear paths, starting from ident, moving forward through the // storage DAG until we hit a storage root. // // On each iteration, we extend every active path by one step. If our // extension involves a fork, we duplicate the path. If any path // contains a cycle, we fault. // // If, by extending a path C, we enter a node which another path // D has already seen, we kill path C. This avoids the possibility of // exponential growth in the number of paths due to extensive forking // and merging. vector< shared_ptr<version_path> > live_paths; string delta_query = "SELECT base FROM " + delta_table + " WHERE id = ?"; { shared_ptr<version_path> pth0 = shared_ptr<version_path>(new version_path()); pth0->push_back(ident); live_paths.push_back(pth0); } shared_ptr<version_path> selected_path; set< hexenc<id> > seen_nodes; while (!selected_path) { vector< shared_ptr<version_path> > next_paths; for (vector<shared_ptr<version_path> >::const_iterator i = live_paths.begin(); i != live_paths.end(); ++i) { shared_ptr<version_path> pth = *i; hexenc<id> tip = pth->back(); if (vcache.exists(tip) || exists(tip, data_table)) { selected_path = pth; break; } else { // This tip is not a root, so extend the path. results res; fetch(res, one_col, any_rows, query(delta_query) % text(tip())); I(res.size() != 0); // Replicate the path if there's a fork. for (size_t k = 1; k < res.size(); ++k) { shared_ptr<version_path> pthN = shared_ptr<version_path>(new version_path(*pth)); extend_path_if_not_cycle(delta_table, pthN, hexenc<id>(res[k][0]), seen_nodes, next_paths); } // And extend the base path we're examining. extend_path_if_not_cycle(delta_table, pth, hexenc<id>(res[0][0]), seen_nodes, next_paths); } } I(selected_path || !next_paths.empty()); live_paths = next_paths; } // Found a root, now trace it back along the path. I(selected_path); I(selected_path->size() > 1); hexenc<id> curr = selected_path->back(); selected_path->pop_back(); data begin; if (vcache.exists(curr)) { I(vcache.fetch(curr, begin)); } else { get(curr, begin, data_table); } shared_ptr<delta_applicator> app = new_piecewise_applicator(); app->begin(begin()); for (version_path::reverse_iterator i = selected_path->rbegin(); i != selected_path->rend(); ++i) { hexenc<id> const nxt = *i; if (!vcache.exists(curr)) { string tmp; app->finish(tmp); vcache.insert(curr, tmp); } L(FL("following delta %s -> %s") % curr % nxt); delta del; get_delta(nxt, curr, del, delta_table); apply_delta (app, del()); app->next(); curr = nxt; } string tmp; app->finish(tmp); dat = data(tmp); hexenc<id> final; calculate_ident(dat, final); I(final == ident); } vcache.insert(ident, dat); } void database::drop(hexenc<id> const & ident, string const & table) { string drop = "DELETE FROM " + table + " WHERE id = ?"; execute(query(drop) % text(ident())); } void database::put_version(hexenc<id> const & old_id, hexenc<id> const & new_id, delta const & del, string const & data_table, string const & delta_table) { MM(del); data old_data, new_data; delta reverse_delta; MM(old_data); MM(new_data); MM(reverse_delta); get_version(old_id, old_data, data_table, delta_table); patch(old_data, del, new_data); { string tmp; invert_xdelta(old_data(), del(), tmp); reverse_delta = delta(tmp); data old_tmp; hexenc<id> old_tmp_id; patch(new_data, reverse_delta