Below is the file 'netsync.cc' from this revision. You can also download the file.
// copyright (C) 2004 graydon hoare <graydon@pobox.com> // all rights reserved. // licensed to the public under the terms of the GNU GPL (>= 2) // see the file COPYING for details #include <map> #include <string> #include <memory> #include <list> #include <time.h> #include <boost/lexical_cast.hpp> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> #include <boost/regex.hpp> #include "app_state.hh" #include "cert.hh" #include "constants.hh" #include "keys.hh" #include "merkle_tree.hh" #include "netcmd.hh" #include "netio.hh" #include "netsync.hh" #include "numeric_vocab.hh" #include "packet.hh" #include "sanity.hh" #include "transforms.hh" #include "ui.hh" #include "xdelta.hh" #include "epoch.hh" #include "platform.hh" #include "cryptopp/osrng.h" #include "netxx/address.h" #include "netxx/peer.h" #include "netxx/probe.h" #include "netxx/socket.h" #include "netxx/stream.h" #include "netxx/streamserver.h" #include "netxx/timeout.h" // // this is the "new" network synchronization (netsync) system in // monotone. it is based on synchronizing a pair of merkle trees over an // interactive connection. // // a netsync process between peers treats each peer as either a source, a // sink, or both. when a peer is only a source, it will not write any new // items to its database. when a peer is only a sink, it will not send any // items from its database. when a peer is both a source and sink, it may // send and write items freely. // // the post-state of a netsync is that each sink contains a superset of the // items in its corresponding source; when peers are behaving as both // source and sink, this means that the post-state of the sync is for the // peers to have identical item sets. // // a peer can be a sink in at most one netsync process at a time; it can // however be a source for multiple netsyncs simultaneously. // // // data structure // -------------- // // each node in a merkle tree contains a fixed number of slots. this number // is derived from a global parameter of the protocol -- the tree fanout -- // such that the number of slots is 2^fanout. for now we will assume that // fanout is 4 thus there are 16 slots in a node, because this makes // illustration easier. the other parameter of the protocol is the size of // a hash; we use SHA1 so the hash is 20 bytes (160 bits) long. // // each slot in a merkle tree node is in one of 4 states: // // - empty // - live leaf // - dead leaf // - subtree // // in addition, each live or dead leaf contains a hash code which // identifies an element of the set being synchronized. each subtree slot // contains a hash code of the node immediately beneath it in the merkle // tree. empty slots contain no hash codes. // // each node also summarizes, for sake of statistic-gathering, the number // of set elements and total number of bytes in all of its subtrees, each // stored as a size_t and sent as a uleb128. // // since empty slots have no hash code, they are represented implicitly by // a bitmap at the head of each merkle tree node. as an additional // integrity check, each merkle tree node contains a label indicating its // prefix in the tree, and a hash of its own contents. // // in total, then, the byte-level representation of a <160,4> merkle tree // node is as follows: // // 20 bytes - hash of the remaining bytes in the node // 1 byte - type of this node (manifest, file, key, mcert, fcert) // 1-N bytes - level of this node in the tree (0 == "root", uleb128) // 0-20 bytes - the prefix of this node, 4 bits * level, // rounded up to a byte // 1-N bytes - number of leaves under this node (uleb128) // 4 bytes - slot-state bitmap of the node // 0-320 bytes - between 0 and 16 live slots in the node // // so, in the worst case such a node is 367 bytes, with these parameters. // // // protocol // -------- // // The protocol is a simple binary command-packet system over TCP; // each packet consists of a single byte which identifies the protocol // version, a byte which identifies the command name inside that // version, a size_t sent as a uleb128 indicating the length of the // packet, that many bytes of payload, and finally 20 bytes of SHA-1 // HMAC calculated over the payload. The key for the SHA-1 HMAC is 20 // bytes of 0 during authentication, and a 20-byte random key chosen // by the client after authentication (discussed below). // //---- Pre-v5 packet format ---- // // the protocol is a simple binary command-packet system over tcp; each // packet consists of a byte which identifies the protocol version, a byte // which identifies the command name inside that version, a size_t sent as // a uleb128 indicating the length of the packet, and then that many bytes // of payload, and finally 4 bytes of adler32 checksum (in LSB order) over // the payload. // // ---- end pre-v5 packet format ---- // // decoding involves simply buffering until a sufficient number of // bytes are received, then advancing the buffer pointer. any time an // integrity check (adler32 for pre-v5, HMAC for post-v5) fails, the // protocol is assumed to have lost synchronization, and the // connection is dropped. the parties are free to drop the tcp stream // at any point, if too much data is received or too much idle time // passes; no commitments or transactions are made. // // one special command, "bye", is used to shut down a connection // gracefully. once each side has received all the data they want, they // can send a "bye" command to the other side. as soon as either side has // both sent and received a "bye" command, they drop the connection. if // either side sees an i/o failure (dropped connection) after they have // sent a "bye" command, they consider the shutdown successful. // // the exchange begins in a non-authenticated state. the server sends a // "hello <id> <nonce>" command, which identifies the server's RSA key and // issues a nonce which must be used for a subsequent authentication. // // The client then responds with either: // // An "auth (source|sink|both) <pattern> <id> <nonce1> <hmac key> // <sig>" command, which identifies its RSA key, notes the role it // wishes to play in the synchronization, identifies the pattern it // wishes to sync with, signs the previous nonce with its own key, and // informs the server of the HMAC key it wishes to use for this // session (encrypted with the server's public key); or // // An "anonymous (source|sink|both) <pattern> <hmac key>" command, // which identifies the role it wishes to play in the synchronization, // the pattern it ishes to sync with, and the HMAC key it wishes to // use for this session (also encrypted with the server's public key). // // The server then replies with a "confirm" command, which contains no // other data but will only have the correct HMAC integrity code if // the server received and properly decrypted the HMAC key offered by // the client. This transitions the peers into an authenticated state // and begins refinement. // // ---- Pre-v5 authentication process notes ---- // // the exchange begins in a non-authenticated state. the server sends a // "hello <id> <nonce>" command, which identifies the server's RSA key and // issues a nonce which must be used for a subsequent authentication. // the client can then respond with an "auth (source|sink|both) // <pattern> <id> <nonce1> <nonce2> <sig>" command which identifies its // RSA key, notes the role it wishes to play in the synchronization, // identifies the pattern it wishes to sync with, signs the previous // nonce with its own key, and issues a nonce of its own for mutual // authentication. // // the server can then respond with a "confirm <sig>" command, which is // the signature of the second nonce sent by the client. this // transitions the peers into an authenticated state and begins refinement. // // ---- End pre-v5 authentication process ---- // // refinement begins with the client sending its root public key and // manifest certificate merkle nodes to the server. the server then // compares the root to each slot in *its* root node, and for each slot // either sends refined subtrees to the client, or (if it detects a missing // item in one pattern or the other) sends either "data" or "send_data" // commands corresponding to the role of the missing item (source or // sink). the client then receives each refined subtree and compares it // with its own, performing similar description/request behavior depending // on role, and the cycle continues. // // detecting the end of refinement is subtle: after sending the refinement // of the root node, the server sends a "done 0" command (queued behind all // the other refinement traffic). when either peer receives a "done N" // command it immediately responds with a "done N+1" command. when two done // commands for a given merkle tree arrive with no interveining refinements, // the entire merkle tree is considered complete. // // any "send_data" command received prompts a "data" command in response, // if the requested item exists. if an item does not exist, a "nonexistant" // response command is sent. // // once a response is received for each requested key and revision cert // (either data or nonexistant) the requesting party walks the graph of // received revision certs and transmits send_data or send_delta commands // for all the revisions mentionned in the certs which it does not already // have in its database. // // for each revision it receives, the recipient requests all the file data or // deltas described in that revision. // // once all requested files, manifests, revisions and certs are received (or // noted as nonexistant), the recipient closes its connection. // // (aside: this protocol is raw binary because coding density is actually // important here, and each packet consists of very information-dense // material that you wouldn't have a hope of typing in manually anyways) // using namespace boost; using namespace std; static inline void require(bool check, string const & context) { if (!check) throw bad_decode(F("check of '%s' failed") % context); } struct done_marker { bool current_level_had_refinements; bool tree_is_done; done_marker() : current_level_had_refinements(false), tree_is_done(false) {} }; struct session { protocol_role role; protocol_voice const voice; vector<utf8> patterns; app_state & app; string peer_id; Netxx::socket_type fd; Netxx::Stream str; string inbuf; string outbuf; netcmd cmd; bool armed; bool arm(); utf8 pattern; boost::regex pattern_re; id remote_peer_key_hash; rsa_keypair_id remote_peer_key_name; netsync_session_key session_key; netsync_hmac_value read_hmac; netsync_hmac_value write_hmac; bool authenticated; time_t last_io_time; auto_ptr<ticker> byte_in_ticker; auto_ptr<ticker> byte_out_ticker; auto_ptr<ticker> cert_in_ticker; auto_ptr<ticker> cert_out_ticker; auto_ptr<ticker> revision_in_ticker; auto_ptr<ticker> revision_out_ticker; auto_ptr<ticker> revision_checked_ticker; vector<revision_id> written_revisions; vector<rsa_keypair_id> written_keys; vector<cert> written_certs; map<netcmd_item_type, boost::shared_ptr<merkle_table> > merkle_tables; map<netcmd_item_type, done_marker> done_refinements; map<netcmd_item_type, boost::shared_ptr< set<id> > > requested_items; map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestry; map<revision_id, map<cert_name, vector<cert> > > received_certs; set< pair<id, id> > reverse_delta_requests; bool analyzed_ancestry; id saved_nonce; bool received_goodbye; bool sent_goodbye; boost::scoped_ptr<CryptoPP::AutoSeededRandomPool> prng; packet_db_valve dbw; session(protocol_role role, protocol_voice voice, vector<utf8> const & patterns, app_state & app, string const & peer, Netxx::socket_type sock, Netxx::Timeout const & to); virtual ~session(); void rev_written_callback(revision_id rid); void key_written_callback(rsa_keypair_id kid); void cert_written_callback(cert const & c); id mk_nonce(); void mark_recent_io(); void setup_client_tickers(); bool done_all_refinements(); bool cert_refinement_done(); bool all_requested_revisions_received(); void note_item_requested(netcmd_item_type ty, id const & i); bool item_request_outstanding(netcmd_item_type ty, id const & i); void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); void note_item_sent(netcmd_item_type ty, id const & i); bool got_all_data(); void maybe_say_goodbye(); void analyze_attachment(revision_id const & i, set<revision_id> & visited, map<revision_id, bool> & attached); void request_rev_revisions(revision_id const & init, map<revision_id, bool> attached, set<revision_id> visited); void request_fwd_revisions(revision_id const & i, map<revision_id, bool> attached, set<revision_id> & visited); void analyze_ancestry_graph(); void analyze_manifest(manifest_map const & man); Netxx::Probe::ready_type which_events() const; bool read_some(); bool write_some(); bool encountered_error; void error(string const & errmsg); void write_netcmd_and_try_flush(netcmd const & cmd); void queue_bye_cmd(); void queue_error_cmd(string const & errmsg); void queue_done_cmd(size_t level, netcmd_item_type type); void queue_hello_cmd(id const & server, id const & nonce); void queue_anonymous_cmd(protocol_role role, string const & pattern, id const & nonce2, base64<rsa_pub_key> server_key_encoded); void queue_auth_cmd(protocol_role role, string const & pattern, id const & client, id const & nonce1, id const & nonce2, string const & signature, base64<rsa_pub_key> server_key_encoded); void queue_confirm_cmd(); void queue_refine_cmd(merkle_node const & node); void queue_send_data_cmd(netcmd_item_type type, id const & item); void queue_send_delta_cmd(netcmd_item_type type, id const & base, id const & ident); void queue_data_cmd(netcmd_item_type type, id const & item, string const & dat); void queue_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del); void queue_nonexistant_cmd(netcmd_item_type type, id const & item); bool process_bye_cmd(); bool process_error_cmd(string const & errmsg); bool process_done_cmd(size_t level, netcmd_item_type type); bool process_hello_cmd(rsa_keypair_id const & server_keyname, rsa_pub_key const & server_key, id const & nonce); bool process_anonymous_cmd(protocol_role role, string const & pattern); bool process_auth_cmd(protocol_role role, string const & pattern, id const & client, id const & nonce1, string const & signature); void respond_to_auth_cmd(rsa_oaep_sha_data hmac_key_encrypted); bool process_confirm_cmd(string const & signature); void respond_to_confirm_cmd(); bool process_refine_cmd(merkle_node const & node); bool process_send_data_cmd(netcmd_item_type type, id const & item); bool process_send_delta_cmd(netcmd_item_type type, id const & base, id const & ident); bool process_data_cmd(netcmd_item_type type, id const & item, string const & dat); bool process_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del); bool process_nonexistant_cmd(netcmd_item_type type, id const & item); bool merkle_node_exists(netcmd_item_type type, size_t level, prefix const & pref); void load_merkle_node(netcmd_item_type type, size_t level, prefix const & pref, merkle_ptr & node); void rebuild_merkle_trees(app_state & app, set<utf8> const & branches); bool dispatch_payload(netcmd const & cmd); void begin_service(); bool process(); }; struct root_prefix { prefix val; root_prefix() : val("") {} }; static root_prefix const & get_root_prefix() { // this is not a static variable for a bizarre reason: mac OSX runs // static initializers in the "wrong" order (application before // libraries), so the initializer for a static string in cryptopp runs // after the initializer for a static variable outside a function // here. therefore encode_hexenc() fails in the static initializer here // and the program crashes. curious, eh? static root_prefix ROOT_PREFIX; return ROOT_PREFIX; } session::session(protocol_role role, protocol_voice voice, vector<utf8> const & patterns, app_state & app, string const & peer, Netxx::socket_type sock, Netxx::Timeout const & to) : role(role), voice(voice), patterns(patterns), app(app), peer_id(peer), fd(sock), str(sock, to), inbuf(""), outbuf(""), armed(false), pattern(""), pattern_re(".*"), remote_peer_key_hash(""), remote_peer_key_name(""), session_key(constants::netsync_key_initializer), read_hmac(constants::netsync_key_initializer), write_hmac(constants::netsync_key_initializer), authenticated(false), last_io_time(::time(NULL)), byte_in_ticker(NULL), byte_out_ticker(NULL), cert_in_ticker(NULL), cert_out_ticker(NULL), revision_in_ticker(NULL), revision_out_ticker(NULL), revision_checked_ticker(NULL), analyzed_ancestry(false), saved_nonce(""), received_goodbye(false), sent_goodbye(false), dbw(app, true), encountered_error(false) { if (voice == client_voice) { N(patterns.size() == 1, F("client can only sync one pattern at a time")); this->pattern = idx(patterns, 0); this->pattern_re = boost::regex(this->pattern()); } dbw.set_on_revision_written(boost::bind(&session::rev_written_callback, this, _1)); dbw.set_on_cert_written(boost::bind(&session::cert_written_callback, this, _1)); dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback, this, _1)); // we will panic here if the user doesn't like urandom and we can't give // them a real entropy-driven random. bool request_blocking_rng = false; if (!app.lua.hook_non_blocking_rng_ok()) { #ifndef BLOCKING_RNG_AVAILABLE throw oops("no blocking RNG available and non-blocking RNG rejected"); #else request_blocking_rng = true; #endif } prng.reset(new CryptoPP::AutoSeededRandomPool(request_blocking_rng)); done_refinements.insert(make_pair(cert_item, done_marker())); done_refinements.insert(make_pair(key_item, done_marker())); done_refinements.insert(make_pair(epoch_item, done_marker())); requested_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>()))); } session::~session() { vector<cert> unattached_certs; map<revision_id, vector<cert> > revcerts; for (vector<revision_id>::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) revcerts.insert(make_pair(*i, vector<cert>())); for (vector<cert>::iterator i = written_certs.begin(); i != written_certs.end(); ++i) { map<revision_id, vector<cert> >::iterator j; j = revcerts.find(i->ident); if (j == revcerts.end()) unattached_certs.push_back(*i); else j->second.push_back(*i); } //Keys for (vector<rsa_keypair_id>::iterator i = written_keys.begin(); i != written_keys.end(); ++i) { app.lua.hook_note_netsync_pubkey_received(*i); } //Revisions for (vector<revision_id>::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) { vector<cert> & ctmp(revcerts[*i]); set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs; for (vector<cert>::const_iterator j = ctmp.begin(); j != ctmp.end(); ++j) { cert_value vtmp; decode_base64(j->value, vtmp); certs.insert(make_pair(j->key, make_pair(j->name, vtmp))); } app.lua.hook_note_netsync_revision_received(*i, certs); } //Certs (not attached to a new revision) for (vector<cert>::iterator i = unattached_certs.begin(); i != unattached_certs.end(); ++i) { cert_value tmp; decode_base64(i->value, tmp); app.lua.hook_note_netsync_cert_received(i->ident, i->key, i->name, tmp); } } void session::rev_written_callback(revision_id rid) { if (revision_checked_ticker.get()) ++(*revision_checked_ticker); written_revisions.push_back(rid); } void session::key_written_callback(rsa_keypair_id kid) { written_keys.push_back(kid); } void session::cert_written_callback(cert const & c) { written_certs.push_back(c); } id session::mk_nonce() { I(this->saved_nonce().size() == 0); char buf[constants::merkle_hash_length_in_bytes]; prng->GenerateBlock(reinterpret_cast<byte *>(buf), constants::merkle_hash_length_in_bytes); this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes); I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes); return this->saved_nonce; } void session::mark_recent_io() { last_io_time = ::time(NULL); } void session::setup_client_tickers() { byte_in_ticker.reset(new ticker("bytes in", ">", 1024, true)); byte_out_ticker.reset(new ticker("bytes out", "<", 1024, true)); if (role == sink_role) { revision_checked_ticker.reset(new ticker("revs written", "w", 1)); cert_in_ticker.reset(new ticker("certs in", "c", 3)); revision_in_ticker.reset(new ticker("revs in", "r", 1)); } else if (role == source_role) { cert_out_ticker.reset(new ticker("certs out", "C", 3)); revision_out_ticker.reset(new ticker("revs out", "R", 1)); } else { I(role == source_and_sink_role); revision_checked_ticker.reset(new ticker("revs written", "w", 1)); revision_in_ticker.reset(new ticker("revs in", "r", 1)); revision_out_ticker.reset(new ticker("revs out", "R", 1)); } } bool session::done_all_refinements() { bool all = true; for (map<netcmd_item_type, done_marker>::const_iterator j = done_refinements.begin(); j != done_refinements.end(); ++j) { if (j->second.tree_is_done == false) all = false; } return all; } bool session::cert_refinement_done() { return done_refinements[cert_item].tree_is_done; } bool session::got_all_data() { for (map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.begin(); i != requested_items.end(); ++i) { if (! i->second->empty()) return false; } return true; } bool session::all_requested_revisions_received() { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(revision_item); I(i != requested_items.end()); return i->second->empty(); } void session::maybe_note_epochs_finished() { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(epoch_item); I(i != requested_items.end()); // Maybe there are outstanding epoch requests. if (!i->second->empty()) return; // And maybe we haven't even finished the refinement. if (!done_refinements[epoch_item].tree_is_done) return; // But otherwise, we're ready to go! L(F("all epochs processed, opening database valve\n")); this->dbw.open_valve(); } void session::note_item_requested(netcmd_item_type ty, id const & ident) { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(ty); I(i != requested_items.end()); i->second->insert(ident); } void session::note_item_arrived(netcmd_item_type ty, id const & ident) { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(ty); I(i != requested_items.end()); i->second->erase(ident); switch (ty) { case cert_item: if (cert_in_ticker.get() != NULL) ++(*cert_in_ticker); break; case revision_item: if (revision_in_ticker.get() != NULL) ++(*revision_in_ticker); break; default: // No ticker for other things. break; } } bool session::item_request_outstanding(netcmd_item_type ty, id const & ident) { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(ty); I(i != requested_items.end()); return i->second->find(ident) != i->second->end(); } void session::note_item_sent(netcmd_item_type ty, id const & ident) { switch (ty) { case cert_item: if (cert_out_ticker.get() != NULL) ++(*cert_out_ticker); break; case revision_item: if (revision_out_ticker.get() != NULL) ++(*revision_out_ticker); break; default: // No ticker for other things. break; } } void session::write_netcmd_and_try_flush(netcmd const & cmd) { if (!encountered_error) cmd.write(outbuf, session_key, write_hmac); else L(F("dropping outgoing netcmd (because we're in error unwind mode)\n")); // FIXME: this helps keep the protocol pipeline full but it seems to // interfere with initial and final sequences. careful with it. // write_some(); // read_some(); } // This method triggers a special "error unwind" mode to netsync. In this // mode, all received data is ignored, and no new data is queued. We simply // stay connected long enough for the current write buffer to be flushed, to // ensure that our peer receives the error message. // WARNING WARNING WARNING (FIXME): this does _not_ throw an exception. if // while processing any given netcmd packet you encounter an error, you can // _only_ call this method if you have not touched the database, because if // you have touched the database then you need to throw an exception to // trigger a rollback. // you could, of course, call this method and then throw an exception, but // there is no point in doing that, because throwing the exception will cause // the connection to be immediately terminated, so your call to error() will // actually have no effect (except to cause your error message to be printed // twice). void session::error(std::string const & errmsg) { W(F("error: %s\n") % errmsg); queue_error_cmd(errmsg); encountered_error = true; } void session::analyze_manifest(manifest_map const & man) { L(F("analyzing %d entries in manifest\n") % man.size()); for (manifest_map::const_iterator i = man.begin(); i != man.end(); ++i) { if (! this->app.db.file_version_exists(manifest_entry_id(i))) { id tmp; decode_hexenc(manifest_entry_id(i).inner(), tmp); queue_send_data_cmd(file_item, tmp); } } } static bool is_attached(revision_id const & i, map<revision_id, bool> const & attach_map) { map<revision_id, bool>::const_iterator j = attach_map.find(i); I(j != attach_map.end()); return j->second; } // this tells us whether a particular revision is "attached" -- meaning // either our database contains the underlying manifest or else one of our // parents (recursively, and only in the current ancestry graph we're // requesting) is attached. if it's detached we will request it using a // different (more efficient and less failure-prone) algorithm void session::analyze_attachment(revision_id const & i, set<revision_id> & visited, map<revision_id, bool> & attached) { typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT; if (visited.find(i) != visited.end()) return; visited.insert(i); bool curr_attached = false; if (app.db.revision_exists(i)) { L(F("revision %s is attached via database\n") % i); curr_attached = true; } else { L(F("checking attachment of %s in ancestry\n") % i); ancestryT::const_iterator j = ancestry.find(i); if (j != ancestry.end()) { for (edge_map::const_iterator k = j->second->second.edges.begin(); k != j->second->second.edges.end(); ++k) { L(F("checking attachment of %s in parent %s\n") % i % edge_old_revision(k)); analyze_attachment(edge_old_revision(k), visited, attached); if (is_attached(edge_old_revision(k), attached)) { L(F("revision %s is attached via parent %s\n") % i % edge_old_revision(k)); curr_attached = true; } } } } L(F("decided that revision %s %s attached\n") % i % (curr_attached ? "is" : "is not")); attached[i] = curr_attached; } inline static id plain_id(manifest_id const & i) { id tmp; hexenc<id> htmp(i.inner()); decode_hexenc(htmp, tmp); return tmp; } inline static id plain_id(file_id const & i) { id tmp; hexenc<id> htmp(i.inner()); decode_hexenc(htmp, tmp); return tmp; } void session::request_rev_revisions(revision_id const & init, map<revision_id, bool> attached, set<revision_id> visited) { typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT; set<manifest_id> seen_manifests; set<file_id> seen_files; set<revision_id> frontier; frontier.insert(init); while(!frontier.empty()) { set<revision_id> next_frontier; for (set<revision_id>::const_iterator i = frontier.begin(); i != frontier.end(); ++i) { if (is_attached(*i, attached)) continue; if (visited.find(*i) != visited.end()) continue; visited.insert(*i); ancestryT::const_iterator j = ancestry.find(*i); if (j != ancestry.end()) { for (edge_map::const_iterator k = j->second->second.edges.begin(); k != j->second->second.edges.end(); ++k) { next_frontier.insert(edge_old_revision(k)); // check out the manifest delta edge manifest_id parent_manifest = edge_old_manifest(k); manifest_id child_manifest = j->second->second.new_manifest; // first, if we have a child we've never seen before we will need // to request it in its entrety. if (seen_manifests.find(child_manifest) == seen_manifests.end()) { if (this->app.db.manifest_version_exists(child_manifest)) L(F("not requesting (in reverse) initial manifest %s as we already have it\n") % child_manifest); else { L(F("requesting (in reverse) initial manifest data %s\n") % child_manifest); queue_send_data_cmd(manifest_item, plain_id(child_manifest)); } seen_manifests.insert(child_manifest); } // second, if the parent is nonempty, we want to ask for an edge to it if (!parent_manifest.inner()().empty()) { if (this->app.db.manifest_version_exists(parent_manifest)) L(F("not requesting (in reverse) manifest delta to %s as we already have it\n") % parent_manifest); else { L(F("requesting (in reverse) manifest delta %s -> %s\n") % child_manifest % parent_manifest); reverse_delta_requests.insert(make_pair(plain_id(child_manifest), plain_id(parent_manifest))); queue_send_delta_cmd(manifest_item, plain_id(child_manifest), plain_id(parent_manifest)); } seen_manifests.insert(parent_manifest); } // check out each file delta edge change_set const & cset = edge_changes(k); for (change_set::delta_map::const_iterator d = cset.deltas.begin(); d != cset.deltas.end(); ++d) { file_id parent_file (delta_entry_src(d)); file_id child_file (delta_entry_dst(d)); // first, if we have a child we've never seen before we will need // to request it in its entrety. if (seen_files.find(child_file) == seen_files.end()) { if (this->app.db.file_version_exists(child_file)) L(F("not requesting (in reverse) initial file %s as we already have it\n") % child_file); else { L(F("requesting (in reverse) initial file data %s\n") % child_file); queue_send_data_cmd(file_item, plain_id(child_file)); } seen_files.insert(child_file); } // second, if the parent is nonempty, we want to ask for an edge to it if (!parent_file.inner()().empty()) { if (this->app.db.file_version_exists(parent_file)) L(F("not requesting (in reverse) file delta to %s as we already have it\n") % parent_file); else { L(F("requesting (in reverse) file delta %s -> %s on %s\n") % child_file % parent_file % delta_entry_path(d)); reverse_delta_requests.insert(make_pair(plain_id(child_file), plain_id(parent_file))); queue_send_delta_cmd(file_item, plain_id(child_file), plain_id(parent_file)); } seen_files.insert(parent_file); } } } // now actually consume the data packet, which will wait on the // arrival of its prerequisites in the packet_db_writer this->dbw.consume_revision_data(j->first, j->second->first); } } frontier = next_frontier; } } void session::request_fwd_revisions(revision_id const & i, map<revision_id, bool> attached, set<revision_id> & visited) { if (visited.find(i) != visited.end()) return; visited.insert(i); L(F("visiting revision '%s' for forward deltas\n") % i); typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT; ancestryT::const_iterator j = ancestry.find(i); if (j != ancestry.end()) { edge_map::const_iterator an_attached_edge = j->second->second.edges.end(); // first make sure we've requested enough to get to here by // calling ourselves recursively. this is the forward path after all. for (edge_map::const_iterator k = j->second->second.edges.begin(); k != j->second->second.edges.end(); ++k) { if (is_attached(edge_old_revision(k), attached)) { request_fwd_revisions(edge_old_revision(k), attached, visited); an_attached_edge = k; } } I(an_attached_edge != j->second->second.edges.end()); // check out the manifest delta edge manifest_id parent_manifest = edge_old_manifest(an_attached_edge); manifest_id child_manifest = j->second->second.new_manifest; if (this->app.db.manifest_version_exists(child_manifest)) L(F("not requesting forward manifest delta to '%s' as we already have it\n") % child_manifest); else { if (parent_manifest.inner()().empty()) { L(F("requesting full manifest data %s\n") % child_manifest); queue_send_data_cmd(manifest_item, plain_id(child_manifest)); } else { L(F("requesting forward manifest delta %s -> %s\n") % parent_manifest % child_manifest); queue_send_delta_cmd(manifest_item, plain_id(parent_manifest), plain_id(child_manifest)); } } // check out each file delta edge change_set const & an_attached_cset = edge_changes(an_attached_edge); for (change_set::delta_map::const_iterator k = an_attached_cset.deltas.begin(); k != an_attached_cset.deltas.end(); ++k) { if (this->app.db.file_version_exists(delta_entry_dst(k))) L(F("not requesting forward delta %s -> %s on file %s as we already have it\n") % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k)); else { if (delta_entry_src(k).inner()().empty()) { L(F("requesting full file data %s\n") % delta_entry_dst(k)); queue_send_data_cmd(file_item, plain_id(delta_entry_dst(k))); } else { L(F("requesting forward delta %s -> %s on file %s\n") % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k)); queue_send_delta_cmd(file_item, plain_id(delta_entry_src(k)), plain_id(delta_entry_dst(k))); } } } // now actually consume the data packet, which will wait on the // arrival of its prerequisites in the packet_db_writer this->dbw.consume_revision_data(j->first, j->second->first); } } void session::analyze_ancestry_graph() { typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT; typedef map<cert_name, vector<cert> > cert_map; if (! (all_requested_revisions_received() && cert_refinement_done())) return; if (analyzed_ancestry) return; set<revision_id> heads; { set<revision_id> nodes, parents; map<revision_id, int> chld_num; L(F("analyzing %d ancestry edges\n") % ancestry.size()); for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i) { nodes.insert(i->first); for (edge_map::const_iterator j = i->second->second.edges.begin(); j != i->second->second.edges.end(); ++j) { parents.insert(edge_old_revision(j)); map<revision_id, int>::iterator n; n = chld_num.find(edge_old_revision(j)); if (n == chld_num.end()) chld_num.insert(make_pair(edge_old_revision(j), 1)); else ++(n->second); } } set_difference(nodes.begin(), nodes.end(), parents.begin(), parents.end(), inserter(heads, heads.begin())); // Write permissions checking: // remove heads w/o proper certs, add their children to heads // 1) remove unwanted branch certs from consideration // - server: check write permission hook // - client: check against sync pattern // 2) remove heads w/o a branch tag, process new exposed heads // 3) repeat 2 until no change //1 set<string> ok_branches, bad_branches; cert_name bcert_name(branch_cert_name); cert_name tcert_name(tag_cert_name); for (map<revision_id, cert_map>::iterator i = received_certs.begin(); i != received_certs.end(); ++i) {