Below is the file 'netsync.cc' from this revision. You can also download the file.
// Copyright (C) 2008 Stephen Leake <stephen_leake@stephe-leake.org> // Copyright (C) 2004 Graydon Hoare <graydon@pobox.com> // // This program is made available under the GNU GPL version 2.0 or // greater. See the accompanying file COPYING for details. // // This program is distributed WITHOUT ANY WARRANTY; without even the // implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR // PURPOSE. #include "base.hh" #include <map> #include <cstdlib> #include <memory> #include <list> #include <deque> #include <stack> #include <time.h> #include "lexical_cast.hh" #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> #include "lua_hooks.hh" #include "key_store.hh" #include "project.hh" #include "database.hh" #include "cert.hh" #include "constants.hh" #include "enumerator.hh" #include "keys.hh" #include "lua.hh" #include "merkle_tree.hh" #include "netcmd.hh" #include "netio.hh" #include "numeric_vocab.hh" #include "refiner.hh" #include "revision.hh" #include "sanity.hh" #include "transforms.hh" #include "ui.hh" #include "xdelta.hh" #include "epoch.hh" #include "platform.hh" #include "hmac.hh" #include "globish.hh" #include "uri.hh" #include "options.hh" #include "botan/botan.h" #include "netxx/address.h" #include "netxx/peer.h" #include "netxx/probe.h" #include "netxx/socket.h" #include "netxx/sockopt.h" #include "netxx/stream.h" #include "netxx/streamserver.h" #include "netxx/timeout.h" #include "netxx_pipe.hh" // TODO: things to do that will break protocol compatibility // -- need some way to upgrade anonymous to keyed pull, without user having // to explicitly specify which they want // just having a way to respond "access denied, try again" might work // but perhaps better to have the anonymous command include a note "I // _could_ use key <...> if you prefer", and if that would lead to more // access, could reply "I do prefer". (Does this lead to too much // information exposure? Allows anonymous people to probe what branches // a key has access to.) // -- "warning" packet type? // -- Richard Levitte wants, when you (e.g.) request '*' but don't have // access to all of it, you just get the parts you have access to // (maybe with warnings about skipped branches). to do this right, // should have a way for the server to send back to the client "right, // you're not getting the following branches: ...", so the client will // not include them in its merkle trie. // -- add some sort of vhost field to the client's first packet, saying who // they expect to talk to // // This is the "new" network synchronization (netsync) system in // monotone. It is based on synchronizing pairs of merkle trees over an // interactive connection. // // A netsync process between peers treats each peer as either a source, a // sink, or both. When a peer is only a source, it will not write any new // items to its database. when a peer is only a sink, it will not send any // items from its database. When a peer is both a source and sink, it may // send and write items freely. // // The post-state of a netsync is that each sink contains a superset of the // items in its corresponding source; when peers are behaving as both // source and sink, this means that the post-state of the sync is for the // peers to have identical item sets. // // // Data structure // -------------- // // Each node in a merkle tree contains a fixed number of slots. this number // is derived from a global parameter of the protocol -- the tree fanout -- // such that the number of slots is 2^fanout. For now we will assume that // fanout is 4 thus there are 16 slots in a node, because this makes // illustration easier. The other parameter of the protocol is the size of // a hash; we use SHA1 so the hash is 20 bytes (160 bits) long. // // Each slot in a merkle tree node is in one of 3 states: // // - empty // - leaf // - subtree // // In addition, each leaf contains a hash code which identifies an element // of the set being synchronized. Each subtree slot contains a hash code of // the node immediately beneath it in the merkle tree. Empty slots contain // no hash codes. // // Since empty slots have no hash code, they are represented implicitly by // a bitmap at the head of each merkle tree node. As an additional // integrity check, each merkle tree node contains a label indicating its // prefix in the tree, and a hash of its own contents. // // In total, then, the byte-level representation of a <160,4> merkle tree // node is as follows: // // 20 bytes - hash of the remaining bytes in the node // 1 byte - type of this node (manifest, file, key, mcert, fcert) // 1-N bytes - level of this node in the tree (0 == "root", uleb128) // 0-20 bytes - the prefix of this node, 4 bits * level, // rounded up to a byte // 1-N bytes - number of leaves under this node (uleb128) // 4 bytes - slot-state bitmap of the node // 0-320 bytes - between 0 and 16 live slots in the node // // So, in the worst case such a node is 367 bytes, with these parameters. // // // Protocol // -------- // // The protocol is a binary command-packet system over TCP; each packet // consists of a single byte which identifies the protocol version, a byte // which identifies the command name inside that version, a size_t sent as // a uleb128 indicating the length of the packet, that many bytes of // payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload. // The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a // 20-byte random key chosen by the client after authentication (discussed // below). Decoding involves simply buffering until a sufficient number of // bytes are received, then advancing the buffer pointer. Any time an // integrity check (the HMAC) fails, the protocol is assumed to have lost // synchronization, and the connection is dropped. The parties are free to // drop the TCP stream at any point, if too much data is received or too // much idle time passes; no commitments or transactions are made. // // // Authentication and setup // ------------------------ // // The exchange begins in a non-authenticated state. The server sends a // "hello <id> <nonce>" command, which identifies the server's RSA key and // issues a nonce which must be used for a subsequent authentication. // // The client then responds with either: // // An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id> // <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the // role it wishes to play in the synchronization, identifies the pattern it // wishes to sync with, signs the previous nonce with its own key, and informs // the server of the HMAC key it wishes to use for this session (encrypted // with the server's public key); or // // An "anonymous (source|sink|both) <include_pattern> <exclude_pattern> // <hmac key>" command, which identifies the role it wishes to play in the // synchronization, the pattern it wishes to sync with, and the HMAC key it // wishes to use for this session (also encrypted with the server's public // key). // // The server then replies with a "confirm" command, which contains no // other data but will only have the correct HMAC integrity code if the // server received and properly decrypted the HMAC key offered by the // client. This transitions the peers into an authenticated state and // begins epoch refinement. If epoch refinement and epoch transmission // succeed, the peers switch to data refinement and data transmission. // // // Refinement // ---------- // // Refinement is executed by "refiners"; there is a refiner for each // set of 'items' being exchanged: epochs, keys, certs, and revisions. // When refinement starts, each party knows only their own set of // items; when refinement completes, each party has learned of the // complete set of items it needs to send, and a count of items it's // expecting to receive. // // For more details on the refinement process, see refiner.cc. // // // Transmission // ------------ // // Once the set of items to send has been determined (for keys, certs, and // revisions) each peer switches into a transmission mode. This mode // involves walking the revision graph in ancestry-order and sending all // the items the local peer has which the remote one does not. Since the // remote and local peers both know all the items which need to be // transferred (they learned during refinement) they know what to wait for // and what to send. The mechanisms of the transmission phase (notably, // enumerator.cc) simply ensure that things are sent in the proper order, // and without over-filling the output buffer too much. // // // Shutdown // -------- // // After transmission completes, one special command, "bye", is used to // shut down a connection gracefully. The shutdown sequence based on "bye" // commands is documented below in session::process_bye_cmd. // // // Note on epochs // -------------- // // One refinement and transmission phase preceeds all the others: epochs. // Epochs are exchanged and compared in order to be sure that further // refinement and transmission (on certs and revisions) makes sense; they // are a sort of "immune system" to prevent incompatible databases (say // between rebuilds due to bugs in monotone) from cross-contaminating. The // later refinements are only kicked off *after* all epochs are received // and compare correctly. // // // Note on dense coding // -------------------- // // This protocol is "raw binary" (non-text) because coding density is // actually important here, and each packet consists of very // information-dense material that you wouldn't have a hope of typing in, // or interpreting manually anyways. // using std::auto_ptr; using std::deque; using std::make_pair; using std::map; using std::min; using std::pair; using std::set; using std::string; using std::vector; using boost::shared_ptr; using boost::lexical_cast; struct server_initiated_sync_request { string what; string address; string include; string exclude; }; deque<server_initiated_sync_request> server_initiated_sync_requests; LUAEXT(server_request_sync, ) { char const * w = luaL_checkstring(L, 1); char const * a = luaL_checkstring(L, 2); char const * i = luaL_checkstring(L, 3); char const * e = luaL_checkstring(L, 4); server_initiated_sync_request request; request.what = string(w); request.address = string(a); request.include = string(i); request.exclude = string(e); server_initiated_sync_requests.push_back(request); return 0; } static inline void require(bool check, string const & context) { if (!check) throw bad_decode(F("check of '%s' failed") % context); } static void read_pubkey(string const & in, rsa_keypair_id & id, rsa_pub_key & pub) { string tmp_id, tmp_key; size_t pos = 0; extract_variable_length_string(in, tmp_id, pos, "pubkey id"); extract_variable_length_string(in, tmp_key, pos, "pubkey value"); id = rsa_keypair_id(tmp_id); pub = rsa_pub_key(tmp_key); } static void write_pubkey(rsa_keypair_id const & id, rsa_pub_key const & pub, string & out) { insert_variable_length_string(id(), out); insert_variable_length_string(pub(), out); } struct netsync_error { string msg; netsync_error(string const & s): msg(s) {} }; struct session: public refiner_callbacks, public enumerator_callbacks { protocol_role role; protocol_voice const voice; globish our_include_pattern; globish our_exclude_pattern; globish_matcher our_matcher; project_t & project; key_store & keys; lua_hooks & lua; bool use_transport_auth; rsa_keypair_id const & signing_key; vector<rsa_keypair_id> const & keys_to_push; string peer_id; shared_ptr<Netxx::StreamBase> str; string_queue inbuf; // deque of pair<string data, size_t cur_pos> deque< pair<string,size_t> > outbuf; // the total data stored in outbuf - this is // used as a valve to stop too much data // backing up size_t outbuf_size; netcmd cmd; bool armed; bool arm(); bool received_remote_key; rsa_keypair_id remote_peer_key_name; netsync_session_key session_key; chained_hmac read_hmac; chained_hmac write_hmac; bool authenticated; time_t last_io_time; auto_ptr<ticker> byte_in_ticker; auto_ptr<ticker> byte_out_ticker; auto_ptr<ticker> cert_in_ticker; auto_ptr<ticker> cert_out_ticker; auto_ptr<ticker> revision_in_ticker; auto_ptr<ticker> revision_out_ticker; size_t bytes_in, bytes_out; size_t certs_in, certs_out; size_t revs_in, revs_out; size_t keys_in, keys_out; // used to identify this session to the netsync hooks. // We can't just use saved_nonce, because that's blank for all // anonymous connections and could lead to confusion. size_t session_id; static size_t session_count; // These are read from the server, written to the local database vector<revision_id> written_revisions; vector<rsa_keypair_id> written_keys; vector<cert> written_certs; // These are sent to the server vector<revision_id> sent_revisions; vector<rsa_keypair_id> sent_keys; vector<cert> sent_certs; id saved_nonce; enum { working_state, shutdown_state, confirmed_state } protocol_state; bool encountered_error; static const int no_error = 200; static const int partial_transfer = 211; static const int no_transfer = 212; static const int not_permitted = 412; static const int unknown_key = 422; static const int mixing_versions = 432; static const int role_mismatch = 512; static const int bad_command = 521; static const int failed_identification = 532; //static const int bad_data = 541; int error_code; bool set_totals; // Interface to refinement. refiner epoch_refiner; refiner key_refiner; refiner cert_refiner; refiner rev_refiner; // Interface to ancestry grovelling. revision_enumerator rev_enumerator; // Enumerator_callbacks methods. set<file_id> file_items_sent; bool process_this_rev(revision_id const & rev); bool queue_this_cert(id const & c); bool queue_this_file(id const & f); void note_file_data(file_id const & f); void note_file_delta(file_id const & src, file_id const & dst); void note_rev(revision_id const & rev); void note_cert(id const & c); session(options & opts, lua_hooks & lua, project_t & project, key_store & keys, protocol_role role, protocol_voice voice, globish const & our_include_pattern, globish const & our_exclude_pattern, string const & peer, shared_ptr<Netxx::StreamBase> sock, bool initiated_by_server = false); virtual ~session(); id mk_nonce(); void mark_recent_io(); void set_session_key(string const & key); void set_session_key(rsa_oaep_sha_data const & key_encrypted); void setup_client_tickers(); bool done_all_refinements(); bool queued_all_items(); bool received_all_items(); bool finished_working(); void maybe_step(); void maybe_say_goodbye(transaction_guard & guard); void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); void note_item_sent(netcmd_item_type ty, id const & i); Netxx::Probe::ready_type which_events() const; bool read_some(); bool write_some(); void error(int errcode, string const & errmsg); void write_netcmd_and_try_flush(netcmd const & cmd); // Outgoing queue-writers. void queue_bye_cmd(u8 phase); void queue_error_cmd(string const & errmsg); void queue_done_cmd(netcmd_item_type type, size_t n_items); void queue_hello_cmd(rsa_keypair_id const & key_name, rsa_pub_key const & pub_encoded, id const & nonce); void queue_anonymous_cmd(protocol_role role, globish const & include_pattern, globish const & exclude_pattern, id const & nonce2); void queue_auth_cmd(protocol_role role, globish const & include_pattern, globish const & exclude_pattern, id const & client, id const & nonce1, id const & nonce2, rsa_sha1_signature const & signature); void queue_confirm_cmd(); void queue_refine_cmd(refinement_type ty, merkle_node const & node); void queue_data_cmd(netcmd_item_type type, id const & item, string const & dat); void queue_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del); // Incoming dispatch-called methods. bool process_error_cmd(string const & errmsg); bool process_hello_cmd(rsa_keypair_id const & server_keyname, rsa_pub_key const & server_key, id const & nonce); bool process_bye_cmd(u8 phase, transaction_guard & guard); bool process_anonymous_cmd(protocol_role role, globish const & their_include_pattern, globish const & their_exclude_pattern); bool process_auth_cmd(protocol_role role, globish const & their_include_pattern, globish const & their_exclude_pattern, id const & client, id const & nonce1, rsa_sha1_signature const & signature); bool process_refine_cmd(refinement_type ty, merkle_node const & node); bool process_done_cmd(netcmd_item_type type, size_t n_items); bool process_data_cmd(netcmd_item_type type, id const & item, string const & dat); bool process_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del); bool process_usher_cmd(utf8 const & msg); // The incoming dispatcher. bool dispatch_payload(netcmd const & cmd, transaction_guard & guard); // Various helpers. void assume_corresponding_role(protocol_role their_role); void respond_to_confirm_cmd(); bool data_exists(netcmd_item_type type, id const & item); void load_data(netcmd_item_type type, id const & item, string & out); void rebuild_merkle_trees(set<branch_name> const & branches); void send_all_data(netcmd_item_type ty, set<id> const & items); void begin_service(); bool process(transaction_guard & guard); bool initiated_by_server; }; size_t session::session_count = 0; session::session(options & opts, lua_hooks & lua, project_t & project, key_store & keys, protocol_role role, protocol_voice voice, globish const & our_include_pattern, globish const & our_exclude_pattern, string const & peer, shared_ptr<Netxx::StreamBase> sock, bool initiated_by_server) : role(role), voice(voice), our_include_pattern(our_include_pattern), our_exclude_pattern(our_exclude_pattern), our_matcher(our_include_pattern, our_exclude_pattern), project(project), keys(keys), lua(lua), use_transport_auth(opts.use_transport_auth), signing_key(opts.signing_key), keys_to_push(opts.keys_to_push), peer_id(peer), str(sock), inbuf(), outbuf_size(0), armed(false), received_remote_key(false), remote_peer_key_name(""), session_key(constants::netsync_key_initializer), read_hmac(netsync_session_key(constants::netsync_key_initializer), use_transport_auth), write_hmac(netsync_session_key(constants::netsync_key_initializer), use_transport_auth), authenticated(false), last_io_time(::time(NULL)), byte_in_ticker(NULL), byte_out_ticker(NULL), cert_in_ticker(NULL), cert_out_ticker(NULL), revision_in_ticker(NULL), revision_out_ticker(NULL), bytes_in(0), bytes_out(0), certs_in(0), certs_out(0), revs_in(0), revs_out(0), keys_in(0), keys_out(0), session_id(++session_count), saved_nonce(""), protocol_state(working_state), encountered_error(false), error_code(no_transfer), set_totals(false), epoch_refiner(epoch_item, voice, *this), key_refiner(key_item, voice, *this), cert_refiner(cert_item, voice, *this), rev_refiner(revision_item, voice, *this), rev_enumerator(project, *this), initiated_by_server(initiated_by_server) {} session::~session() { if (protocol_state == confirmed_state) error_code = no_error; else if (error_code == no_transfer && (revs_in || revs_out || certs_in || certs_out || keys_in || keys_out)) error_code = partial_transfer; vector<cert> unattached_written_certs; map<revision_id, vector<cert> > rev_written_certs; for (vector<revision_id>::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) rev_written_certs.insert(make_pair(*i, vector<cert>())); for (vector<cert>::iterator i = written_certs.begin(); i != written_certs.end(); ++i) { map<revision_id, vector<cert> >::iterator j; j = rev_written_certs.find(revision_id(i->ident)); if (j == rev_written_certs.end()) unattached_written_certs.push_back(*i); else j->second.push_back(*i); } if (!written_keys.empty() || !written_revisions.empty() || !written_certs.empty()) { //Keys for (vector<rsa_keypair_id>::iterator i = written_keys.begin(); i != written_keys.end(); ++i) { lua.hook_note_netsync_pubkey_received(*i, session_id); } //Revisions for (vector<revision_id>::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) { vector<cert> & ctmp(rev_written_certs[*i]); set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs; for (vector<cert>::const_iterator j = ctmp.begin(); j != ctmp.end(); ++j) certs.insert(make_pair(j->key, make_pair(j->name, j->value))); revision_data rdat; project.db.get_revision(*i, rdat); lua.hook_note_netsync_revision_received(*i, rdat, certs, session_id); } //Certs (not attached to a new revision) for (vector<cert>::iterator i = unattached_written_certs.begin(); i != unattached_written_certs.end(); ++i) lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key, i->name, i->value, session_id); } if (!sent_keys.empty() || !sent_revisions.empty() || !sent_certs.empty()) { vector<cert> unattached_sent_certs; map<revision_id, vector<cert> > rev_sent_certs; for (vector<revision_id>::iterator i = sent_revisions.begin(); i != sent_revisions.end(); ++i) rev_sent_certs.insert(make_pair(*i, vector<cert>())); for (vector<cert>::iterator i = sent_certs.begin(); i != sent_certs.end(); ++i) { map<revision_id, vector<cert> >::iterator j; j = rev_sent_certs.find(revision_id(i->ident)); if (j == rev_sent_certs.end()) unattached_sent_certs.push_back(*i); else j->second.push_back(*i); } //Keys for (vector<rsa_keypair_id>::iterator i = sent_keys.begin(); i != sent_keys.end(); ++i) { lua.hook_note_netsync_pubkey_sent(*i, session_id); } //Revisions for (vector<revision_id>::iterator i = sent_revisions.begin(); i != sent_revisions.end(); ++i) { vector<cert> & ctmp(rev_sent_certs[*i]); set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs; for (vector<cert>::const_iterator j = ctmp.begin(); j != ctmp.end(); ++j) certs.insert(make_pair(j->key, make_pair(j->name, j->value))); revision_data rdat; project.db.get_revision(*i, rdat); lua.hook_note_netsync_revision_sent(*i, rdat, certs, session_id); } //Certs (not attached to a new revision) for (vector<cert>::iterator i = unattached_sent_certs.begin(); i != unattached_sent_certs.end(); ++i) lua.hook_note_netsync_cert_sent(revision_id(i->ident), i->key, i->name, i->value, session_id); } lua.hook_note_netsync_end(session_id, error_code, bytes_in, bytes_out, certs_in, certs_out, revs_in, revs_out, keys_in, keys_out); } bool session::process_this_rev(revision_id const & rev) { return (rev_refiner.items_to_send.find(rev.inner()) != rev_refiner.items_to_send.end()); } bool session::queue_this_cert(id const & c) { return (cert_refiner.items_to_send.find(c) != cert_refiner.items_to_send.end()); } bool session::queue_this_file(id const & f) { return file_items_sent.find(file_id(f)) == file_items_sent.end(); } void session::note_file_data(file_id const & f) { if (role == sink_role) return; file_data fd; project.db.get_file_version(f, fd); queue_data_cmd(file_item, f.inner(), fd.inner()()); file_items_sent.insert(f); } void session::note_file_delta(file_id const & src, file_id const & dst) { if (role == sink_role) return; file_delta fdel; project.db.get_arbitrary_file_delta(src, dst, fdel); queue_delta_cmd(file_item, src.inner(), dst.inner(), fdel.inner()); file_items_sent.insert(dst); } void session::note_rev(revision_id const & rev) { if (role == sink_role) return; revision_t rs; project.db.get_revision(rev, rs); data tmp; write_revision(rs, tmp); queue_data_cmd(revision_item, rev.inner(), tmp()); sent_revisions.push_back(rev); } void session::note_cert(id const & c) { if (role == sink_role) return; revision<cert> cert; string str; project.db.get_revision_cert(c, cert); write_cert(cert.inner(), str); queue_data_cmd(cert_item, c, str); sent_certs.push_back(cert.inner()); } id session::mk_nonce() { I(this->saved_nonce().size() == 0); char buf[constants::merkle_hash_length_in_bytes]; Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf), constants::merkle_hash_length_in_bytes); this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes)); I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes); return this->saved_nonce; } void session::mark_recent_io() { last_io_time = ::time(NULL); } void session::set_session_key(string const & key) { session_key = netsync_session_key(key); read_hmac.set_key(session_key); write_hmac.set_key(session_key); } void session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted) { if (use_transport_auth) { string hmac_key; keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key); set_session_key(hmac_key); } } void session::setup_client_tickers() { // xgettext: please use short message and try to avoid multibytes chars byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true)); // xgettext: please use short message and try to avoid multibytes chars byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true)); if (role == sink_role) { // xgettext: please use short message and try to avoid multibytes chars cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3)); // xgettext: please use short message and try to avoid multibytes chars revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1)); } else if (role == source_role) { // xgettext: please use short message and try to avoid multibytes chars cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3)); // xgettext: please use short message and try to avoid multibytes chars revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1)); } else { I(role == source_and_sink_role); // xgettext: please use short message and try to avoid multibytes chars cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3, false, true)); // xgettext: please use short message and try to avoid multibytes chars cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3, false, true)); // xgettext: please use short message and try to avoid multibytes chars revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1)); // xgettext: please use short message and try to avoid multibytes chars revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1)); } } bool session::done_all_refinements() { bool all = rev_refiner.done && cert_refiner.done && key_refiner.done && epoch_refiner.done; if (all && !set_totals) { if (cert_out_ticker.get()) cert_out_ticker->set_total(cert_refiner.items_to_send.size()); if (revision_out_ticker.get()) revision_out_ticker->set_total(rev_refiner.items_to_send.size()); if (cert_in_ticker.get()) cert_in_ticker->set_total(cert_refiner.items_to_receive); if (revision_in_ticker.get()) revision_in_ticker->set_total(rev_refiner.items_to_receive); set_totals = true; } return all; } bool session::received_all_items() { if (role == source_role) return true; bool all = rev_refiner.items_to_receive == 0 && cert_refiner.items_to_receive == 0 && key_refiner.items_to_receive == 0 && epoch_refiner.items_to_receive == 0; return all; } bool session::finished_working() { bool all = done_all_refinements() && received_all_items() && queued_all_items() && rev_enumerator.done(); return all; } bool session::queued_all_items() { if (role == sink_role) return true; bool all = rev_refiner.items_to_send.empty() && cert_refiner.items_to_send.empty() && key_refiner.items_to_send.empty() && epoch_refiner.items_to_send.empty(); return all; } void session::maybe_note_epochs_finished() { // Maybe there are outstanding epoch requests. // These only matter if we're in sink or source-and-sink mode. if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role)) return; // And maybe we haven't even finished the refinement. if (!epoch_refiner.done) return; // If we ran into an error -- say a mismatched epoch -- don't do any // further refinements. if (encountered_error) return; // But otherwise, we're ready to go. Start the next // set of refinements. if (voice == client_voice) { L(FL("epoch refinement finished; beginning other refinements")); key_refiner.begin_refinement(); cert_refiner.begin_refinement(); rev_refiner.begin_refinement(); } else L(FL("epoch refinement finished")); } static void decrement_if_nonzero(netcmd_item_type ty, size_t & n) { if (n == 0) { string typestr; netcmd_item_type_to_string(ty, typestr); E(false, F("underflow on count of %s items to receive") % typestr); } --n; if (n == 0) { string typestr; netcmd_item_type_to_string(ty, typestr); L(FL("count of %s items to receive has reached zero") % typestr); } } void session::note_item_arrived(netcmd_item_type ty, id const & ident) { switch (ty) { case cert_item: decrement_if_nonzero(ty, cert_refiner.items_to_receive); if (cert_in_ticker.get() != NULL) ++(*cert_in_ticker); ++certs_in; break; case revision_item: decrement_if_nonzero(ty, rev_refiner.items_to_receive); if (revision_in_ticker.get() != NULL) ++(*revision_in_ticker); ++revs_in; break; case key_item: decrement_if_nonzero(ty, key_refiner.items_to_receive); ++keys_in; break; case epoch_item: decrement_if_nonzero(ty, epoch_refiner.items_to_receive); break; default: // No ticker for other things. break; } } void session::note_item_sent(netcmd_item_type ty, id const & ident) { switch (ty) { case cert_item: cert_refiner.items_to_send.erase(ident); if (cert_out_ticker.get() != NULL) ++(*cert_out_ticker); ++certs_out; break; case revision_item: rev_refiner.items_to_send.erase(ident); if (revision_out_ticker.get() != NULL) ++(*revision_out_ticker); ++revs_out; break; case key_item: key_refiner.items_to_send.erase(ident); ++keys_out; break; case epoch_item: epoch_refiner.items_to_send.erase(ident); break; default: // No ticker for other things. break; } } void session::write_netcmd_and_try_flush(netcmd const & cmd) { if (!encountered_error) { string buf; cmd.write(buf, write_hmac); outbuf.push_back(make_pair(buf, 0)); outbuf_size += buf.size(); } else L(FL("dropping outgoing netcmd (because we're in error unwind mode)")); // FIXME: this helps keep the protocol pipeline full but it seems to // interfere with initial and final sequences. careful with it. // write_some(); // read_some(); } // This method triggers a special "error unwind" mode to netsync. In this // mode, all received data is ignored, and no new data is queued. We simply // stay connected long enough for the current write buffer to be flushed, to // ensure that our peer receives the error message. // Affects read_some, write_some, and process . void session::error(int errcode, string const & errmsg) { error_code = errcode; throw netsync_error(errmsg); } Netxx::Probe::ready_type session::which_events() const { // Only ask to read if we're not armed. if (outbuf.empty()) { if (inbuf.size() < constants::netcmd_maxsz && !armed) return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd; else return Netxx::Probe::ready_oobd; } else { if (inbuf.size() < constants::netcmd_maxsz && !armed) return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd; else return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd; } } bool session::read_some() { I(inbuf.size() < constants::netcmd_maxsz); char tmp[constants::bufsz]; Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); if (count > 0) { L(FL("read %d bytes from fd %d (peer %s)") % count % str->get_socketfd() % peer_id); if (encountered_error) { L(FL("in error unwind mode, so throwing them into the bit bucket")); return true; } inbuf.append(tmp,count); mark_recent_io(); if (byte_in_ticker.get() != NULL) (*byte_in_ticker) += count; bytes_in += count; return true; } else return false; } bool session::write_some() { I(!outbuf.empty()); size_t writelen = outbuf.front().first.size() - outbuf.front().second; Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second, min(writelen, constants::bufsz)); if (count > 0) { if ((size_t)count == writelen) { outbuf_size -= outbuf.front().first.size(); outbuf.pop_front(); } else { outbuf.front().second += count; } L(FL("wrote %d bytes to fd %d (peer %s)") % count % str->get_socketfd() % peer_id); mark_recent_io(); if (byte_out_ticker.get() != NULL) (*byte_out_ticker) += count; bytes_out += count; if (encountered_error && outbuf.empty()) { // we've flushed our error message, so it's time to get out. L(FL("finished flushing output queue in error unwind mode, disconnecting")); return false; } return true; } else return false; } // senders void session::queue_error_cmd(string const & errmsg) { L(FL("queueing 'error' command")); netcmd cmd; cmd.write_error_cmd(errmsg); write_netcmd_and_try_flush(cmd); } void session::queue_bye_cmd(u8 phase) { L(FL("queueing 'bye' command, phase %d") % static_cast<size_t>(phase)); netcmd cmd; cmd.write_bye_cmd(phase); write_netcmd_and_try_flush(cmd); } void session::queue_done_cmd(netcmd_item_type type, size_t n_items) { string typestr; netcmd_item_type_to_string(type, typestr); L(FL("queueing 'done' command for %s (%d items)") % typestr % n_items); netcmd cmd; cmd.write_done_cmd(type, n_items); write_netcmd_and_try_flush(cmd); } void session::queue_hello_cmd(rsa_keypair_id const & key_name, rsa_pub_key const & pub, id const & nonce) { if (use_transport_auth) cmd.write_hello_cmd(key_name, pub, nonce); else cmd.write_hello_cmd(key_name, rsa_pub_key(), nonce); write_netcmd_and_try_flush(cmd); } void session::queue_anonymous_cmd(protocol_role role, globish const & include_pattern, globish const & exclude_pattern, id const & nonce2) { netcmd cmd; rsa_oaep_sha_data hmac_key_encrypted; if (use_transport_auth) project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted); cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern, hmac_key_encrypted); write_netcmd_and_try_flush(cmd); set_session_key(nonce2()); } void session::queue_auth_cmd(protocol_role role, globish const & include_pattern, globish const & exclude_pattern, id const & client, id const & nonce1, id const & nonce2, rsa_sha1_signature const & signature) { netcmd cmd; rsa_oaep_sha_data hmac_key_encrypted; I(use_transport_auth); project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted); cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client, nonce1, hmac_key_encrypted, signature); write_netcmd_and_try_flush(cmd); set_session_key(nonce2()); } void session::queue_confirm_cmd() { netcmd cmd; cmd.write_confirm_cmd(); write_netcmd_and_try_flush(cmd); } void session::queue_refine_cmd(refinement_type ty, merkle_node const & node) { string typestr; hexenc<prefix> hpref; node.get_hex_prefix(hpref); netcmd_item_type_to_string(node.type, typestr); L(FL("queueing refinement %s of %s node '%s', level %d") % (ty == refinement_query ? "query" : "response") % typestr % hpref() % static_cast<int>(node.level)); netcmd cmd; cmd.write_refine_cmd(ty, node); write_netcmd_and_try_flush(cmd); } void session::queue_data_cmd(netcmd_item_type type, id const & item, string const & dat) { string typestr; netcmd_item_type_to_string(type, typestr); hexenc<id> hid; if (global_sanity.debug_p()) encode_hexenc(item, hid); if (role == sink_role) { L(FL("not queueing %s data for '%s' as we are in pure sink role") % typestr % hid()); return; } L(FL("queueing %d bytes of data for %s item '%s'") % dat.size() % typestr % hid()); netcmd cmd; // TODO: This pair of functions will make two copies of a large // file, the first in cmd.write_data_cmd, and the second in // write_netcmd_and_try_flush when the data is copied from the // cmd.payload variable to the string buffer for output. This // double copy should be collapsed out, it may be better to use // a string_queue for output as well as input, as that will reduce // the amount of mallocs that happen when the string queue is large // enough to just store the data. cmd.write_data_cmd(type, item, dat); write_netcmd_and_try_flush(cmd); note_item_sent(type, item); } void session::queue_delta_cmd(netcmd_item_type type