Below is the file 'netsync.cc' from this revision. You can also download the file.
// copyright (C) 2004 graydon hoare <graydon@pobox.com> // all rights reserved. // licensed to the public under the terms of the GNU GPL (>= 2) // see the file COPYING for details #include <map> #include <string> #include <memory> #include <list> #include <deque> #include <stack> #include <time.h> #include <boost/lexical_cast.hpp> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> #include <boost/regex.hpp> #include "app_state.hh" #include "cert.hh" #include "constants.hh" #include "keys.hh" #include "merkle_tree.hh" #include "netcmd.hh" #include "netio.hh" #include "netsync.hh" #include "numeric_vocab.hh" #include "packet.hh" #include "sanity.hh" #include "transforms.hh" #include "ui.hh" #include "xdelta.hh" #include "epoch.hh" #include "platform.hh" #include "hmac.hh" #include "globish.hh" #include "botan/botan.h" #include "netxx/address.h" #include "netxx/peer.h" #include "netxx/probe.h" #include "netxx/socket.h" #include "netxx/stream.h" #include "netxx/streamserver.h" #include "netxx/timeout.h" // TODO: things to do that will break protocol compatibility // -- need some way to upgrade anonymous to keyed pull, without user having // to explicitly specify which they want // just having a way to respond "access denied, try again" might work // but perhaps better to have the anonymous command include a note "I // _could_ use key <...> if you prefer", and if that would lead to more // access, could reply "I do prefer". (Does this lead to too much // information exposure? Allows anonymous people to probe what branches // a key has access to.) // -- "warning" packet type? // -- Richard Levitte wants, when you (e.g.) request '*' but don't access to // all of it, you just get the parts you have access to (maybe with // warnings about skipped branches). to do this right, should have a way // for the server to send back to the client "right, you're not getting // the following branches: ...", so the client will not include them in // its merkle trie. // -- add some sort of vhost field to the client's first packet, saying who // they expect to talk to // -- connection teardown is flawed: // -- simple bug: often connections "fail" even though they succeeded. // should figure out why. (Possibly one side doesn't wait for their // goodbye packet to drain before closing the socket?) // -- subtle misdesign: "goodbye" packets indicate completion of data // transfer. they do not indicate that data has been written to // disk. there should be some way to indicate that data has been // successfully written to disk. See message (and thread) // <E0420553-34F3-45E8-9DA4-D8A5CB9B0600@hsdev.com> on // monotone-devel. // -- apparently we have a IANA approved port: 4691. I guess we should // switch to using that. // (It's registered under the name "netsync". "monotone" would probably // be better, but I don't know how possible it is to change this...) // // this is the "new" network synchronization (netsync) system in // monotone. it is based on synchronizing a pair of merkle trees over an // interactive connection. // // a netsync process between peers treats each peer as either a source, a // sink, or both. when a peer is only a source, it will not write any new // items to its database. when a peer is only a sink, it will not send any // items from its database. when a peer is both a source and sink, it may // send and write items freely. // // the post-state of a netsync is that each sink contains a superset of the // items in its corresponding source; when peers are behaving as both // source and sink, this means that the post-state of the sync is for the // peers to have identical item sets. // // a peer can be a sink in at most one netsync process at a time; it can // however be a source for multiple netsyncs simultaneously. // // // data structure // -------------- // // each node in a merkle tree contains a fixed number of slots. this number // is derived from a global parameter of the protocol -- the tree fanout -- // such that the number of slots is 2^fanout. for now we will assume that // fanout is 4 thus there are 16 slots in a node, because this makes // illustration easier. the other parameter of the protocol is the size of // a hash; we use SHA1 so the hash is 20 bytes (160 bits) long. // // each slot in a merkle tree node is in one of 4 states: // // - empty // - live leaf // - dead leaf // - subtree // // in addition, each live or dead leaf contains a hash code which // identifies an element of the set being synchronized. each subtree slot // contains a hash code of the node immediately beneath it in the merkle // tree. empty slots contain no hash codes. // // each node also summarizes, for sake of statistic-gathering, the number // of set elements and total number of bytes in all of its subtrees, each // stored as a size_t and sent as a uleb128. // // since empty slots have no hash code, they are represented implicitly by // a bitmap at the head of each merkle tree node. as an additional // integrity check, each merkle tree node contains a label indicating its // prefix in the tree, and a hash of its own contents. // // in total, then, the byte-level representation of a <160,4> merkle tree // node is as follows: // // 20 bytes - hash of the remaining bytes in the node // 1 byte - type of this node (manifest, file, key, mcert, fcert) // 1-N bytes - level of this node in the tree (0 == "root", uleb128) // 0-20 bytes - the prefix of this node, 4 bits * level, // rounded up to a byte // 1-N bytes - number of leaves under this node (uleb128) // 4 bytes - slot-state bitmap of the node // 0-320 bytes - between 0 and 16 live slots in the node // // so, in the worst case such a node is 367 bytes, with these parameters. // // // protocol // -------- // // The protocol is a simple binary command-packet system over TCP; // each packet consists of a single byte which identifies the protocol // version, a byte which identifies the command name inside that // version, a size_t sent as a uleb128 indicating the length of the // packet, that many bytes of payload, and finally 20 bytes of SHA-1 // HMAC calculated over the payload. The key for the SHA-1 HMAC is 20 // bytes of 0 during authentication, and a 20-byte random key chosen // by the client after authentication (discussed below). // decoding involves simply buffering until a sufficient number of bytes are // received, then advancing the buffer pointer. any time an integrity check // (the HMAC) fails, the protocol is assumed to have lost synchronization, and // the connection is dropped. the parties are free to drop the tcp stream at // any point, if too much data is received or too much idle time passes; no // commitments or transactions are made. // // one special command, "bye", is used to shut down a connection // gracefully. once each side has received all the data they want, they // can send a "bye" command to the other side. as soon as either side has // both sent and received a "bye" command, they drop the connection. if // either side sees an i/o failure (dropped connection) after they have // sent a "bye" command, they consider the shutdown successful. // // the exchange begins in a non-authenticated state. the server sends a // "hello <id> <nonce>" command, which identifies the server's RSA key and // issues a nonce which must be used for a subsequent authentication. // // The client then responds with either: // // An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id> // <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the // role it wishes to play in the synchronization, identifies the pattern it // wishes to sync with, signs the previous nonce with its own key, and informs // the server of the HMAC key it wishes to use for this session (encrypted // with the server's public key); or // // An "anonymous (source|sink|both) <include_pattern> <exclude_pattern> // <hmac key>" command, which identifies the role it wishes to play in the // synchronization, the pattern it ishes to sync with, and the HMAC key it // wishes to use for this session (also encrypted with the server's public // key). // // The server then replies with a "confirm" command, which contains no // other data but will only have the correct HMAC integrity code if // the server received and properly decrypted the HMAC key offered by // the client. This transitions the peers into an authenticated state // and begins refinement. // // refinement begins with the client sending its root public key and // manifest certificate merkle nodes to the server. the server then // compares the root to each slot in *its* root node, and for each slot // either sends refined subtrees to the client, or (if it detects a missing // item in one pattern or the other) sends either "data" or "send_data" // commands corresponding to the role of the missing item (source or // sink). the client then receives each refined subtree and compares it // with its own, performing similar description/request behavior depending // on role, and the cycle continues. // // detecting the end of refinement is subtle: after sending the refinement // of the root node, the server sends a "done 0" command (queued behind all // the other refinement traffic). when either peer receives a "done N" // command it immediately responds with a "done N+1" command. when two done // commands for a given merkle tree arrive with no interveining refinements, // the entire merkle tree is considered complete. // // any "send_data" command received prompts a "data" command in response, // if the requested item exists. if an item does not exist, a "nonexistant" // response command is sent. // // once a response is received for each requested key and revision cert // (either data or nonexistant) the requesting party walks the graph of // received revision certs and transmits send_data or send_delta commands // for all the revisions mentionned in the certs which it does not already // have in its database. // // for each revision it receives, the recipient requests all the file data or // deltas described in that revision. // // once all requested files, manifests, revisions and certs are received (or // noted as nonexistant), the recipient closes its connection. // // (aside: this protocol is raw binary because coding density is actually // important here, and each packet consists of very information-dense // material that you wouldn't have a hope of typing in manually anyways) // using namespace std; using boost::shared_ptr; using boost::lexical_cast; static inline void require(bool check, string const & context) { if (!check) throw bad_decode(F("check of '%s' failed") % context); } struct done_marker { bool current_level_had_refinements; bool tree_is_done; done_marker() : current_level_had_refinements(false), tree_is_done(false) {} }; struct session { protocol_role role; protocol_voice const voice; utf8 const & our_include_pattern; utf8 const & our_exclude_pattern; globish_matcher our_matcher; app_state & app; string peer_id; Netxx::socket_type fd; Netxx::Stream str; string_queue inbuf; // deque of pair<string data, size_t cur_pos> deque< pair<string,size_t> > outbuf; // the total data stored in outbuf - this is // used as a valve to stop too much data // backing up size_t outbuf_size; netcmd cmd; bool armed; bool arm(); id remote_peer_key_hash; rsa_keypair_id remote_peer_key_name; netsync_session_key session_key; chained_hmac read_hmac; chained_hmac write_hmac; bool authenticated; time_t last_io_time; auto_ptr<ticker> byte_in_ticker; auto_ptr<ticker> byte_out_ticker; auto_ptr<ticker> cert_in_ticker; auto_ptr<ticker> cert_out_ticker; auto_ptr<ticker> revision_in_ticker; auto_ptr<ticker> revision_out_ticker; auto_ptr<ticker> revision_checked_ticker; vector<revision_id> written_revisions; vector<rsa_keypair_id> written_keys; vector<cert> written_certs; map<netcmd_item_type, boost::shared_ptr<merkle_table> > merkle_tables; map<netcmd_item_type, done_marker> done_refinements; map<netcmd_item_type, boost::shared_ptr< set<id> > > requested_items; map<netcmd_item_type, boost::shared_ptr< set<id> > > received_items; map<netcmd_item_type, boost::shared_ptr< set<id> > > full_delta_items; map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestry; map<revision_id, map<cert_name, vector<cert> > > received_certs; set< pair<id, id> > reverse_delta_requests; bool analyzed_ancestry; id saved_nonce; bool received_goodbye; bool sent_goodbye; packet_db_valve dbw; session(protocol_role role, protocol_voice voice, utf8 const & our_include_pattern, utf8 const & our_exclude_pattern, app_state & app, string const & peer, Netxx::socket_type sock, Netxx::Timeout const & to); virtual ~session(); void rev_written_callback(revision_id rid); void key_written_callback(rsa_keypair_id kid); void cert_written_callback(cert const & c); id mk_nonce(); void mark_recent_io(); void set_session_key(string const & key); void set_session_key(rsa_oaep_sha_data const & key_encrypted); void setup_client_tickers(); bool done_all_refinements(); bool cert_refinement_done(); bool all_requested_revisions_received(); void note_item_requested(netcmd_item_type ty, id const & i); void note_item_full_delta(netcmd_item_type ty, id const & ident); bool item_already_requested(netcmd_item_type ty, id const & i); void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); void note_item_sent(netcmd_item_type ty, id const & i); bool got_all_data(); void maybe_say_goodbye(); void get_heads_and_consume_certs(set<revision_id> & heads); void analyze_ancestry_graph(); void analyze_manifest(manifest_map const & man); Netxx::Probe::ready_type which_events() const; bool read_some(); bool write_some(); bool encountered_error; void error(string const & errmsg); void write_netcmd_and_try_flush(netcmd const & cmd); void queue_bye_cmd(); void queue_error_cmd(string const & errmsg); void queue_done_cmd(size_t level, netcmd_item_type type); void queue_hello_cmd(id const & server, id const & nonce); void queue_anonymous_cmd(protocol_role role, utf8 const & include_pattern, utf8 const & exclude_pattern, id const & nonce2, base64<rsa_pub_key> server_key_encoded); void queue_auth_cmd(protocol_role role, utf8 const & include_pattern, utf8 const & exclude_pattern, id const & client, id const & nonce1, id const & nonce2, string const & signature, base64<rsa_pub_key> server_key_encoded); void queue_confirm_cmd(); void queue_refine_cmd(merkle_node const & node); void queue_send_data_cmd(netcmd_item_type type, id const & item); void queue_send_delta_cmd(netcmd_item_type type, id const & base, id const & ident); void queue_data_cmd(netcmd_item_type type, id const & item, string const & dat); void queue_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del); void queue_nonexistant_cmd(netcmd_item_type type, id const & item); bool process_bye_cmd(); bool process_error_cmd(string const & errmsg); bool process_done_cmd(size_t level, netcmd_item_type type); bool process_hello_cmd(rsa_keypair_id const & server_keyname, rsa_pub_key const & server_key, id const & nonce); bool process_anonymous_cmd(protocol_role role, utf8 const & their_include_pattern, utf8 const & their_exclude_pattern); bool process_auth_cmd(protocol_role role, utf8 const & their_include_pattern, utf8 const & their_exclude_pattern, id const & client, id const & nonce1, string const & signature); bool process_confirm_cmd(string const & signature); void respond_to_confirm_cmd(); bool process_refine_cmd(merkle_node const & node); bool process_send_data_cmd(netcmd_item_type type, id const & item); bool process_send_delta_cmd(netcmd_item_type type, id const & base, id const & ident); bool process_data_cmd(netcmd_item_type type, id const & item, string const & dat); bool process_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del); bool process_nonexistant_cmd(netcmd_item_type type, id const & item); bool merkle_node_exists(netcmd_item_type type, size_t level, prefix const & pref); void load_merkle_node(netcmd_item_type type, size_t level, prefix const & pref, merkle_ptr & node); void rebuild_merkle_trees(app_state & app, set<utf8> const & branches); bool dispatch_payload(netcmd const & cmd); void begin_service(); bool process(); }; struct ancestry_fetcher { session & sess; // map children to parents multimap< file_id, file_id > rev_file_deltas; multimap< manifest_id, manifest_id > rev_manifest_deltas; // map an ancestor to a child multimap< file_id, file_id > fwd_file_deltas; multimap< manifest_id, manifest_id > fwd_manifest_deltas; set< file_id > seen_files; ancestry_fetcher(session & s); // analysing the ancestry graph void traverse_files(change_set const & cset); void traverse_manifest(manifest_id const & child_man, manifest_id const & parent_man); void traverse_ancestry(set<revision_id> const & heads); // requesting the data void request_rev_file_deltas(file_id const & start, set<file_id> & done_files); void request_files(); void request_rev_manifest_deltas(manifest_id const & start, set<manifest_id> & done_manifests); void request_manifests(); }; struct root_prefix { prefix val; root_prefix() : val("") {} }; static root_prefix const & get_root_prefix() { // this is not a static variable for a bizarre reason: mac OSX runs // static initializers in the "wrong" order (application before // libraries), so the initializer for a static string in cryptopp runs // after the initializer for a static variable outside a function // here. therefore encode_hexenc() fails in the static initializer here // and the program crashes. curious, eh? static root_prefix ROOT_PREFIX; return ROOT_PREFIX; } static file_id null_ident; session::session(protocol_role role, protocol_voice voice, utf8 const & our_include_pattern, utf8 const & our_exclude_pattern, app_state & app, string const & peer, Netxx::socket_type sock, Netxx::Timeout const & to) : role(role), voice(voice), our_include_pattern(our_include_pattern), our_exclude_pattern(our_exclude_pattern), our_matcher(our_include_pattern, our_exclude_pattern), app(app), peer_id(peer), fd(sock), str(sock, to), inbuf(), outbuf_size(0), armed(false), remote_peer_key_hash(""), remote_peer_key_name(""), session_key(constants::netsync_key_initializer), read_hmac(constants::netsync_key_initializer), write_hmac(constants::netsync_key_initializer), authenticated(false), last_io_time(::time(NULL)), byte_in_ticker(NULL), byte_out_ticker(NULL), cert_in_ticker(NULL), cert_out_ticker(NULL), revision_in_ticker(NULL), revision_out_ticker(NULL), revision_checked_ticker(NULL), analyzed_ancestry(false), saved_nonce(""), received_goodbye(false), sent_goodbye(false), dbw(app, true), encountered_error(false) { dbw.set_on_revision_written(boost::bind(&session::rev_written_callback, this, _1)); dbw.set_on_cert_written(boost::bind(&session::cert_written_callback, this, _1)); dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback, this, _1)); done_refinements.insert(make_pair(cert_item, done_marker())); done_refinements.insert(make_pair(key_item, done_marker())); done_refinements.insert(make_pair(epoch_item, done_marker())); requested_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>()))); requested_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>()))); received_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>()))); received_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>()))); received_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>()))); received_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>()))); received_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>()))); received_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>()))); full_delta_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>()))); full_delta_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>()))); } session::~session() { vector<cert> unattached_certs; map<revision_id, vector<cert> > revcerts; for (vector<revision_id>::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) revcerts.insert(make_pair(*i, vector<cert>())); for (vector<cert>::iterator i = written_certs.begin(); i != written_certs.end(); ++i) { map<revision_id, vector<cert> >::iterator j; j = revcerts.find(i->ident); if (j == revcerts.end()) unattached_certs.push_back(*i); else j->second.push_back(*i); } //Keys for (vector<rsa_keypair_id>::iterator i = written_keys.begin(); i != written_keys.end(); ++i) { app.lua.hook_note_netsync_pubkey_received(*i); } //Revisions for (vector<revision_id>::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) { vector<cert> & ctmp(revcerts[*i]); set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs; for (vector<cert>::const_iterator j = ctmp.begin(); j != ctmp.end(); ++j) { cert_value vtmp; decode_base64(j->value, vtmp); certs.insert(make_pair(j->key, make_pair(j->name, vtmp))); } app.lua.hook_note_netsync_revision_received(*i, certs); } //Certs (not attached to a new revision) for (vector<cert>::iterator i = unattached_certs.begin(); i != unattached_certs.end(); ++i) { cert_value tmp; decode_base64(i->value, tmp); app.lua.hook_note_netsync_cert_received(i->ident, i->key, i->name, tmp); } } void session::rev_written_callback(revision_id rid) { if (revision_checked_ticker.get()) ++(*revision_checked_ticker); written_revisions.push_back(rid); } void session::key_written_callback(rsa_keypair_id kid) { written_keys.push_back(kid); } void session::cert_written_callback(cert const & c) { written_certs.push_back(c); } id session::mk_nonce() { I(this->saved_nonce().size() == 0); char buf[constants::merkle_hash_length_in_bytes]; Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf), constants::merkle_hash_length_in_bytes); this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes); I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes); return this->saved_nonce; } void session::mark_recent_io() { last_io_time = ::time(NULL); } void session::set_session_key(string const & key) { session_key = netsync_session_key(key); read_hmac.set_key(session_key); write_hmac.set_key(session_key); } void session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted) { base64< arc4<rsa_priv_key> > our_priv; load_priv_key(app, app.signing_key, our_priv); string hmac_key; decrypt_rsa(app.lua, app.signing_key, our_priv, hmac_key_encrypted, hmac_key); set_session_key(hmac_key); } void session::setup_client_tickers() { // xgettext: please use short message and try to avoid multibytes chars byte_in_ticker.reset(new ticker(_("bytes in"), ">", 1024, true)); // xgettext: please use short message and try to avoid multibytes chars byte_out_ticker.reset(new ticker(_("bytes out"), "<", 1024, true)); if (role == sink_role) { // xgettext: please use short message and try to avoid multibytes chars revision_checked_ticker.reset(new ticker(_("revs written"), "w", 1)); // xgettext: please use short message and try to avoid multibytes chars cert_in_ticker.reset(new ticker(_("certs in"), "c", 3)); // xgettext: please use short message and try to avoid multibytes chars revision_in_ticker.reset(new ticker(_("revs in"), "r", 1)); } else if (role == source_role) { // xgettext: please use short message and try to avoid multibytes chars cert_out_ticker.reset(new ticker(_("certs out"), "C", 3)); // xgettext: please use short message and try to avoid multibytes chars revision_out_ticker.reset(new ticker(_("revs out"), "R", 1)); } else { I(role == source_and_sink_role); // xgettext: please use short message and try to avoid multibytes chars revision_checked_ticker.reset(new ticker(_("revs written"), "w", 1)); // xgettext: please use short message and try to avoid multibytes chars revision_in_ticker.reset(new ticker(_("revs in"), "r", 1)); // xgettext: please use short message and try to avoid multibytes chars revision_out_ticker.reset(new ticker(_("revs out"), "R", 1)); } } bool session::done_all_refinements() { bool all = true; for (map<netcmd_item_type, done_marker>::const_iterator j = done_refinements.begin(); j != done_refinements.end(); ++j) { if (j->second.tree_is_done == false) all = false; } return all; } bool session::cert_refinement_done() { return done_refinements[cert_item].tree_is_done; } bool session::got_all_data() { for (map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.begin(); i != requested_items.end(); ++i) { if (! i->second->empty()) return false; } return true; } bool session::all_requested_revisions_received() { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(revision_item); I(i != requested_items.end()); return i->second->empty(); } void session::maybe_note_epochs_finished() { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(epoch_item); I(i != requested_items.end()); // Maybe there are outstanding epoch requests. if (!i->second->empty()) return; // And maybe we haven't even finished the refinement. if (!done_refinements[epoch_item].tree_is_done) return; // But otherwise, we're ready to go! L(F("all epochs processed, opening database valve\n")); this->dbw.open_valve(); } void session::note_item_requested(netcmd_item_type ty, id const & ident) { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(ty); I(i != requested_items.end()); i->second->insert(ident); } void session::note_item_full_delta(netcmd_item_type ty, id const & ident) { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = full_delta_items.find(ty); I(i != full_delta_items.end()); i->second->insert(ident); } void session::note_item_arrived(netcmd_item_type ty, id const & ident) { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i = requested_items.find(ty); I(i != requested_items.end()); i->second->erase(ident); map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator j = received_items.find(ty); I(j != received_items.end()); j->second->insert(ident); switch (ty) { case cert_item: if (cert_in_ticker.get() != NULL) ++(*cert_in_ticker); break; case revision_item: if (revision_in_ticker.get() != NULL) ++(*revision_in_ticker); break; default: // No ticker for other things. break; } } bool session::item_already_requested(netcmd_item_type ty, id const & ident) { map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i; i = requested_items.find(ty); I(i != requested_items.end()); if (i->second->find(ident) != i->second->end()) return true; i = received_items.find(ty); I(i != received_items.end()); if (i->second->find(ident) != i->second->end()) return true; return false; } void session::note_item_sent(netcmd_item_type ty, id const & ident) { switch (ty) { case cert_item: if (cert_out_ticker.get() != NULL) ++(*cert_out_ticker); break; case revision_item: if (revision_out_ticker.get() != NULL) ++(*revision_out_ticker); break; default: // No ticker for other things. break; } } void session::write_netcmd_and_try_flush(netcmd const & cmd) { if (!encountered_error) { string buf; cmd.write(buf, write_hmac); outbuf.push_back(make_pair(buf, 0)); outbuf_size += buf.size(); } else L(F("dropping outgoing netcmd (because we're in error unwind mode)\n")); // FIXME: this helps keep the protocol pipeline full but it seems to // interfere with initial and final sequences. careful with it. // write_some(); // read_some(); } // This method triggers a special "error unwind" mode to netsync. In this // mode, all received data is ignored, and no new data is queued. We simply // stay connected long enough for the current write buffer to be flushed, to // ensure that our peer receives the error message. // WARNING WARNING WARNING (FIXME): this does _not_ throw an exception. if // while processing any given netcmd packet you encounter an error, you can // _only_ call this method if you have not touched the database, because if // you have touched the database then you need to throw an exception to // trigger a rollback. // you could, of course, call this method and then throw an exception, but // there is no point in doing that, because throwing the exception will cause // the connection to be immediately terminated, so your call to error() will // actually have no effect (except to cause your error message to be printed // twice). void session::error(std::string const & errmsg) { W(F("error: %s\n") % errmsg); queue_error_cmd(errmsg); encountered_error = true; } void session::analyze_manifest(manifest_map const & man) { L(F("analyzing %d entries in manifest\n") % man.size()); for (manifest_map::const_iterator i = man.begin(); i != man.end(); ++i) { if (! this->app.db.file_version_exists(manifest_entry_id(i))) { id tmp; decode_hexenc(manifest_entry_id(i).inner(), tmp); queue_send_data_cmd(file_item, tmp); } } } inline static id plain_id(manifest_id const & i) { id tmp; hexenc<id> htmp(i.inner()); decode_hexenc(htmp, tmp); return tmp; } inline static id plain_id(file_id const & i) { id tmp; hexenc<id> htmp(i.inner()); decode_hexenc(htmp, tmp); return tmp; } void session::get_heads_and_consume_certs( set<revision_id> & heads ) { typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT; typedef map<cert_name, vector<cert> > cert_map; set<revision_id> nodes, parents; map<revision_id, int> chld_num; L(F("analyzing %d ancestry edges\n") % ancestry.size()); for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i) { nodes.insert(i->first); for (edge_map::const_iterator j = i->second->second.edges.begin(); j != i->second->second.edges.end(); ++j) { parents.insert(edge_old_revision(j)); map<revision_id, int>::iterator n; n = chld_num.find(edge_old_revision(j)); if (n == chld_num.end()) chld_num.insert(make_pair(edge_old_revision(j), 1)); else ++(n->second); } } set_difference(nodes.begin(), nodes.end(), parents.begin(), parents.end(), inserter(heads, heads.begin())); L(F("intermediate set_difference heads size %d") % heads.size()); // Write permissions checking: // remove heads w/o proper certs, add their children to heads // 1) remove unwanted branch certs from consideration // 2) remove heads w/o a branch tag, process new exposed heads // 3) repeat 2 until no change //1 set<string> ok_branches, bad_branches; cert_name bcert_name(branch_cert_name); cert_name tcert_name(tag_cert_name); for (map<revision_id, cert_map>::iterator i = received_certs.begin(); i != received_certs.end(); ++i) { //branches vector<cert> & bcerts(i->second[bcert_name]); vector<cert> keeping; for (vector<cert>::iterator j = bcerts.begin(); j != bcerts.end(); ++j) { cert_value name; decode_base64(j->value, name); if (ok_branches.find(name()) != ok_branches.end()) keeping.push_back(*j); else if (bad_branches.find(name()) != bad_branches.end()) ; else { if (our_matcher(name())) { ok_branches.insert(name()); keeping.push_back(*j); } else { bad_branches.insert(name()); W(F("Dropping branch certs for unwanted branch %s") % name); } } } bcerts = keeping; } //2 list<revision_id> tmp; for (set<revision_id>::iterator i = heads.begin(); i != heads.end(); ++i) { if (!received_certs[*i][bcert_name].size()) tmp.push_back(*i); } for (list<revision_id>::iterator i = tmp.begin(); i != tmp.end(); ++i) heads.erase(*i); L(F("after step 2, heads size %d") % heads.size()); //3 while (tmp.size()) { ancestryT::const_iterator i = ancestry.find(tmp.front()); if (i != ancestry.end()) { for (edge_map::const_iterator j = i->second->second.edges.begin(); j != i->second->second.edges.end(); ++j) { if (!--chld_num[edge_old_revision(j)]) { if (received_certs[i->first][bcert_name].size()) heads.insert(i->first); else tmp.push_back(edge_old_revision(j)); } } // since we don't want this rev, we don't want it's certs either received_certs[tmp.front()] = cert_map(); } tmp.pop_front(); } L(F("after step 3, heads size %d") % heads.size()); // We've reduced the certs to those we want now, send them to dbw. for (map<revision_id, cert_map>::iterator i = received_certs.begin(); i != received_certs.end(); ++i) { for (cert_map::iterator j = i->second.begin(); j != i->second.end(); ++j) { for (vector<cert>::iterator k = j->second.begin(); k != j->second.end(); ++k) { this->dbw.consume_revision_cert(revision<cert>(*k)); } } } } void session::analyze_ancestry_graph() { L(F("analyze_ancestry_graph")); if (! (all_requested_revisions_received() && cert_refinement_done())) { L(F("not all done in analyze_ancestry_graph")); return; } if (analyzed_ancestry) { L(F("already analyzed_ancestry in analyze_ancestry_graph")); return; } L(F("analyze_ancestry_graph fetching")); ancestry_fetcher fetch(*this); analyzed_ancestry = true; } Netxx::Probe::ready_type session::which_events() const { if (outbuf.empty()) { if (inbuf.size() < constants::netcmd_maxsz) return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd; else return Netxx::Probe::ready_oobd; } else { if (inbuf.size() < constants::netcmd_maxsz) return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd; else return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd; } } bool session::read_some() { I(inbuf.size() < constants::netcmd_maxsz); char tmp[constants::bufsz]; Netxx::signed_size_type count = str.read(tmp, sizeof(tmp)); if (count > 0) { L(F("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id); if (encountered_error) { L(F("in error unwind mode, so throwing them into the bit bucket\n")); return true; } inbuf.append(tmp,count); mark_recent_io(); if (byte_in_ticker.get() != NULL) (*byte_in_ticker) += count; return true; } else return false; } bool session::write_some() { I(!outbuf.empty()); size_t writelen = outbuf.front().first.size() - outbuf.front().second; Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second, std::min(writelen, constants::bufsz)); if (count > 0) { if ((size_t)count == writelen) { outbuf_size -= outbuf.front().first.size(); outbuf.pop_front(); } else { outbuf.front().second += count; } L(F("wrote %d bytes to fd %d (peer %s)\n") % count % fd % peer_id); mark_recent_io(); if (byte_out_ticker.get() != NULL) (*byte_out_ticker) += count; if (encountered_error && outbuf.empty()) { // we've flushed our error message, so it's time to get out. L(F("finished flushing output queue in error unwind mode, disconnecting\n")); return false; } return true; } else return false; } // senders void session::queue_bye_cmd() { L(F("queueing 'bye' command\n")); netcmd cmd; cmd.write_bye_cmd(); write_netcmd_and_try_flush(cmd); this->sent_goodbye = true; } void session::queue_error_cmd(string const & errmsg) { L(F("queueing 'error' command\n")); netcmd cmd; cmd.write_error_cmd(errmsg); write_netcmd_and_try_flush(cmd); this->sent_goodbye = true; } void session::queue_done_cmd(size_t level, netcmd_item_type type) { string typestr; netcmd_item_type_to_string(type, typestr); L(F("queueing 'done' command for %s level %s\n") % typestr % level); netcmd cmd; cmd