Below is the file 'netsync.cc' from this revision. You can also download the file.
// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil; c-basic-offset: 2 -*- // copyright (C) 2004 graydon hoare <graydon@pobox.com> // all rights reserved. // licensed to the public under the terms of the GNU GPL (>= 2) // see the file COPYING for details #include <map> #include <string> #include <memory> #include <list> #include <deque> #include <stack> #include <time.h> #include <boost/lexical_cast.hpp> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> #include <boost/regex.hpp> #include "app_state.hh" #include "cert.hh" #include "constants.hh" #include "enumerator.hh" #include "keys.hh" #include "merkle_tree.hh" #include "netcmd.hh" #include "netio.hh" #include "netsync.hh" #include "numeric_vocab.hh" #include "packet.hh" #include "refiner.hh" #include "sanity.hh" #include "transforms.hh" #include "ui.hh" #include "xdelta.hh" #include "epoch.hh" #include "platform.hh" #include "hmac.hh" #include "globish.hh" #include "botan/botan.h" #include "netxx/address.h" #include "netxx/peer.h" #include "netxx/probe.h" #include "netxx/socket.h" #include "netxx/sockopt.h" #include "netxx/stream.h" #include "netxx/streamserver.h" #include "netxx/timeout.h" // TODO: things to do that will break protocol compatibility // -- need some way to upgrade anonymous to keyed pull, without user having // to explicitly specify which they want // just having a way to respond "access denied, try again" might work // but perhaps better to have the anonymous command include a note "I // _could_ use key <...> if you prefer", and if that would lead to more // access, could reply "I do prefer". (Does this lead to too much // information exposure? Allows anonymous people to probe what branches // a key has access to.) // -- "warning" packet type? // -- Richard Levitte wants, when you (e.g.) request '*' but don't access to // all of it, you just get the parts you have access to (maybe with // warnings about skipped branches). to do this right, should have a way // for the server to send back to the client "right, you're not getting // the following branches: ...", so the client will not include them in // its merkle trie. // -- add some sort of vhost field to the client's first packet, saying who // they expect to talk to // -- apparently we have a IANA approved port: 4691. I guess we should // switch to using that. // // This is the "new" network synchronization (netsync) system in // monotone. It is based on synchronizing pairs of merkle trees over an // interactive connection. // // A netsync process between peers treats each peer as either a source, a // sink, or both. When a peer is only a source, it will not write any new // items to its database. when a peer is only a sink, it will not send any // items from its database. When a peer is both a source and sink, it may // send and write items freely. // // The post-state of a netsync is that each sink contains a superset of the // items in its corresponding source; when peers are behaving as both // source and sink, this means that the post-state of the sync is for the // peers to have identical item sets. // // // Data structure // -------------- // // Each node in a merkle tree contains a fixed number of slots. this number // is derived from a global parameter of the protocol -- the tree fanout -- // such that the number of slots is 2^fanout. For now we will assume that // fanout is 4 thus there are 16 slots in a node, because this makes // illustration easier. The other parameter of the protocol is the size of // a hash; we use SHA1 so the hash is 20 bytes (160 bits) long. // // Each slot in a merkle tree node is in one of 3 states: // // - empty // - leaf // - subtree // // In addition, each leaf contains a hash code which identifies an element // of the set being synchronized. Each subtree slot contains a hash code of // the node immediately beneath it in the merkle tree. Empty slots contain // no hash codes. // // Since empty slots have no hash code, they are represented implicitly by // a bitmap at the head of each merkle tree node. As an additional // integrity check, each merkle tree node contains a label indicating its // prefix in the tree, and a hash of its own contents. // // In total, then, the byte-level representation of a <160,4> merkle tree // node is as follows: // // 20 bytes - hash of the remaining bytes in the node // 1 byte - type of this node (manifest, file, key, mcert, fcert) // 1-N bytes - level of this node in the tree (0 == "root", uleb128) // 0-20 bytes - the prefix of this node, 4 bits * level, // rounded up to a byte // 1-N bytes - number of leaves under this node (uleb128) // 4 bytes - slot-state bitmap of the node // 0-320 bytes - between 0 and 16 live slots in the node // // So, in the worst case such a node is 367 bytes, with these parameters. // // // Protocol // -------- // // The protocol is a binary command-packet system over TCP; each packet // consists of a single byte which identifies the protocol version, a byte // which identifies the command name inside that version, a size_t sent as // a uleb128 indicating the length of the packet, that many bytes of // payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload. // The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a // 20-byte random key chosen by the client after authentication (discussed // below). Decoding involves simply buffering until a sufficient number of // bytes are received, then advancing the buffer pointer. Any time an // integrity check (the HMAC) fails, the protocol is assumed to have lost // synchronization, and the connection is dropped. The parties are free to // drop the TCP stream at any point, if too much data is received or too // much idle time passes; no commitments or transactions are made. // // // Authentication and setup // ------------------------ // // The exchange begins in a non-authenticated state. The server sends a // "hello <id> <nonce>" command, which identifies the server's RSA key and // issues a nonce which must be used for a subsequent authentication. // // The client then responds with either: // // An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id> // <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the // role it wishes to play in the synchronization, identifies the pattern it // wishes to sync with, signs the previous nonce with its own key, and informs // the server of the HMAC key it wishes to use for this session (encrypted // with the server's public key); or // // An "anonymous (source|sink|both) <include_pattern> <exclude_pattern> // <hmac key>" command, which identifies the role it wishes to play in the // synchronization, the pattern it ishes to sync with, and the HMAC key it // wishes to use for this session (also encrypted with the server's public // key). // // The server then replies with a "confirm" command, which contains no // other data but will only have the correct HMAC integrity code if the // server received and properly decrypted the HMAC key offered by the // client. This transitions the peers into an authenticated state and // begins epoch refinement. If epoch refinement and epoch transmission // succeed, the peers switch to data refinement and data transmission. // // // Refinement // ---------- // // Refinement is executed by "refiners"; there is a refiner for each // set of 'items' being exchanged: epochs, keys, certs, and revisions. // When refinement starts, each party knows only their own set of // items; when refinement completes, each party has learned of the // complete set of items it needs to send, and a count of items it's // expecting to receive. // // For more details on the refinement process, see refiner.cc. // // // Transmission // ------------ // // Once the set of items to send has been determined (for keys, certs, and // revisions) each peer switches into a transmission mode. This mode // involves walking the revision graph in ancestry-order and sending all // the items the local peer has which the remote one does not. Since the // remote and local peers both know all the items which need to be // transferred (they learned during refinement) they know what to wait for // and what to send. The mechanisms of the transmission phase (notably, // enumerator.cc) simply ensure that things are sent in the proper order, // and without over-filling the output buffer too much. // // // Shutdown // -------- // // After transmission completes, one special command, "bye", is used to // shut down a connection gracefully. The shutdown sequence based on "bye" // commands is documented below in session::process_bye_cmd. // // // Note on epochs // -------------- // // One refinement and transmission phase preceeds all the others: epochs. // Epochs are exchanged and compared in order to be sure that further // refinement and transmission (on certs and revisions) makes sense; they // are a sort of "immune system" to prevent incompatible databases (say // between rebuilds due to bugs in monotone) from cross-contaminating. The // later refinements are only kicked off *after* all epochs are received // and compare correctly. // // // Note on dense coding // -------------------- // // This protocol is "raw binary" (non-text) because coding density is // actually important here, and each packet consists of very // information-dense material that you wouldn't have a hope of typing in, // interpreting manually anyways. // using namespace std; using boost::shared_ptr; using boost::lexical_cast; static inline void require(bool check, string const & context) { if (!check) throw bad_decode(F("check of '%s' failed") % context); } struct netsync_error { string msg; netsync_error(string const & s): msg(s) {} }; struct session: public refiner_callbacks, public enumerator_callbacks { protocol_role role; protocol_voice const voice; utf8 const & our_include_pattern; utf8 const & our_exclude_pattern; globish_matcher our_matcher; app_state & app; string peer_id; Netxx::socket_type fd; Netxx::Stream str; string_queue inbuf; // deque of pair<string data, size_t cur_pos> deque< pair<string,size_t> > outbuf; // the total data stored in outbuf - this is // used as a valve to stop too much data // backing up size_t outbuf_size; netcmd cmd; bool armed; bool arm(); id remote_peer_key_hash; rsa_keypair_id remote_peer_key_name; netsync_session_key session_key; chained_hmac read_hmac; chained_hmac write_hmac; bool authenticated; time_t last_io_time; auto_ptr<ticker> byte_in_ticker; auto_ptr<ticker> byte_out_ticker; auto_ptr<ticker> cert_in_ticker; auto_ptr<ticker> cert_out_ticker; auto_ptr<ticker> revision_in_ticker; auto_ptr<ticker> revision_out_ticker; vector<revision_id> written_revisions; vector<rsa_keypair_id> written_keys; vector<cert> written_certs; id saved_nonce; packet_db_writer dbw; enum { working_state, shutdown_state, confirmed_state } protocol_state; bool encountered_error; bool set_totals; // Interface to refinement. refiner epoch_refiner; refiner key_refiner; refiner cert_refiner; refiner rev_refiner; // Interface to ancestry grovelling. revision_enumerator rev_enumerator; // Enumerator_callbacks methods. set<file_id> file_items_sent; bool process_this_rev(revision_id const & rev); bool queue_this_cert(hexenc<id> const & c); bool queue_this_file(hexenc<id> const & f); void note_file_data(file_id const & f); void note_file_delta(file_id const & src, file_id const & dst); void note_rev(revision_id const & rev); void note_cert(hexenc<id> const & c); session(protocol_role role, protocol_voice voice, utf8 const & our_include_pattern, utf8 const & our_exclude_pattern, app_state & app, string const & peer, Netxx::socket_type sock, Netxx::Timeout const & to); virtual ~session(); void rev_written_callback(revision_id rid); void key_written_callback(rsa_keypair_id kid); void cert_written_callback(cert const & c); id mk_nonce(); void mark_recent_io(); void set_session_key(string const & key); void set_session_key(rsa_oaep_sha_data const & key_encrypted); void setup_client_tickers(); bool done_all_refinements(); bool queued_all_items(); bool received_all_items(); bool finished_working(); void maybe_step(); void maybe_say_goodbye(transaction_guard & guard); void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); void note_item_sent(netcmd_item_type ty, id const & i); Netxx::Probe::ready_type which_events() const; bool read_some(); bool write_some(); void error(string const & errmsg); void write_netcmd_and_try_flush(netcmd const & cmd); // Outgoing queue-writers. void queue_bye_cmd(u8 phase); void queue_error_cmd(string const & errmsg); void queue_done_cmd(netcmd_item_type type, size_t n_items); void queue_hello_cmd(rsa_keypair_id const & key_name, base64<rsa_pub_key> const & pub_encoded, id const & nonce); void queue_anonymous_cmd(protocol_role role, utf8 const & include_pattern, utf8 const & exclude_pattern, id const & nonce2, base64<rsa_pub_key> server_key_encoded); void queue_auth_cmd(protocol_role role, utf8 const & include_pattern, utf8 const & exclude_pattern, id const & client, id const & nonce1, id const & nonce2, string const & signature, base64<rsa_pub_key> server_key_encoded); void queue_confirm_cmd(); void queue_refine_cmd(refinement_type ty, merkle_node const & node); void queue_data_cmd(netcmd_item_type type, id const & item, string const & dat); void queue_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del); // Incoming dispatch-called methods. bool process_error_cmd(string const & errmsg); bool process_hello_cmd(rsa_keypair_id const & server_keyname, rsa_pub_key const & server_key, id const & nonce); bool process_bye_cmd(u8 phase, transaction_guard & guard); bool process_anonymous_cmd(protocol_role role, utf8 const & their_include_pattern, utf8 const & their_exclude_pattern); bool process_auth_cmd(protocol_role role, utf8 const & their_include_pattern, utf8 const & their_exclude_pattern, id const & client, id const & nonce1, string const & signature); bool process_refine_cmd(refinement_type ty, merkle_node const & node); bool process_done_cmd(netcmd_item_type type, size_t n_items); bool process_data_cmd(netcmd_item_type type, id const & item, string const & dat); bool process_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del); bool process_usher_cmd(utf8 const & msg); // The incoming dispatcher. bool dispatch_payload(netcmd const & cmd, transaction_guard & guard); // Various helpers. void respond_to_confirm_cmd(); void rebuild_merkle_trees(app_state & app, set<utf8> const & branches); void send_all_data(netcmd_item_type ty, set<id> const & items); void begin_service(); bool process(transaction_guard & guard); }; session::session(protocol_role role, protocol_voice voice, utf8 const & our_include_pattern, utf8 const & our_exclude_pattern, app_state & app, string const & peer, Netxx::socket_type sock, Netxx::Timeout const & to) : role(role), voice(voice), our_include_pattern(our_include_pattern), our_exclude_pattern(our_exclude_pattern), our_matcher(our_include_pattern, our_exclude_pattern), app(app), peer_id(peer), fd(sock), str(sock, to), inbuf(), outbuf_size(0), armed(false), remote_peer_key_hash(""), remote_peer_key_name(""), session_key(constants::netsync_key_initializer), read_hmac(constants::netsync_key_initializer), write_hmac(constants::netsync_key_initializer), authenticated(false), last_io_time(::time(NULL)), byte_in_ticker(NULL), byte_out_ticker(NULL), cert_in_ticker(NULL), cert_out_ticker(NULL), revision_in_ticker(NULL), revision_out_ticker(NULL), saved_nonce(""), dbw(app), protocol_state(working_state), encountered_error(false), set_totals(false), epoch_refiner(epoch_item, voice, *this), key_refiner(key_item, voice, *this), cert_refiner(cert_item, voice, *this), rev_refiner(revision_item, voice, *this), rev_enumerator(*this, app) { dbw.set_on_revision_written(boost::bind(&session::rev_written_callback, this, _1)); dbw.set_on_cert_written(boost::bind(&session::cert_written_callback, this, _1)); dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback, this, _1)); } session::~session() { vector<cert> unattached_certs; map<revision_id, vector<cert> > revcerts; for (vector<revision_id>::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) revcerts.insert(make_pair(*i, vector<cert>())); for (vector<cert>::iterator i = written_certs.begin(); i != written_certs.end(); ++i) { map<revision_id, vector<cert> >::iterator j; j = revcerts.find(i->ident); if (j == revcerts.end()) unattached_certs.push_back(*i); else j->second.push_back(*i); } //Keys for (vector<rsa_keypair_id>::iterator i = written_keys.begin(); i != written_keys.end(); ++i) { app.lua.hook_note_netsync_pubkey_received(*i); } //Revisions for (vector<revision_id>::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) { vector<cert> & ctmp(revcerts[*i]); set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs; for (vector<cert>::const_iterator j = ctmp.begin(); j != ctmp.end(); ++j) { cert_value vtmp; decode_base64(j->value, vtmp); certs.insert(make_pair(j->key, make_pair(j->name, vtmp))); } revision_data rdat; app.db.get_revision(*i, rdat); app.lua.hook_note_netsync_revision_received(*i, rdat, certs); } //Certs (not attached to a new revision) for (vector<cert>::iterator i = unattached_certs.begin(); i != unattached_certs.end(); ++i) { cert_value tmp; decode_base64(i->value, tmp); app.lua.hook_note_netsync_cert_received(i->ident, i->key, i->name, tmp); } } bool session::process_this_rev(revision_id const & rev) { id item; decode_hexenc(rev.inner(), item); return (rev_refiner.items_to_send.find(item) != rev_refiner.items_to_send.end()); } bool session::queue_this_cert(hexenc<id> const & c) { id item; decode_hexenc(c, item); return (cert_refiner.items_to_send.find(item) != cert_refiner.items_to_send.end()); } bool session::queue_this_file(hexenc<id> const & f) { return file_items_sent.find(f) == file_items_sent.end(); } void session::note_file_data(file_id const & f) { if (role == sink_role) return; file_data fd; id item; decode_hexenc(f.inner(), item); app.db.get_file_version(f, fd); queue_data_cmd(file_item, item, fd.inner()()); file_items_sent.insert(f); } void session::note_file_delta(file_id const & src, file_id const & dst) { if (role == sink_role) return; file_delta del; app.db.get_file_delta(dst, src, del); id src_id, dst_id; decode_hexenc(src.inner(), src_id); decode_hexenc(dst.inner(), dst_id); queue_delta_cmd(file_item, src_id, dst_id, del.inner()); file_items_sent.insert(dst); } void session::note_rev(revision_id const & rev) { if (role == sink_role) return; revision_set rs; id item; decode_hexenc(rev.inner(), item); app.db.get_revision(rev, rs); data tmp; write_revision_set(rs, tmp); queue_data_cmd(revision_item, item, tmp()); } void session::note_cert(hexenc<id> const & c) { if (role == sink_role) return; id item; decode_hexenc(c, item); revision<cert> cert; string str; app.db.get_revision_cert(c, cert); write_cert(cert.inner(), str); queue_data_cmd(cert_item, item, str); } void session::rev_written_callback(revision_id rid) { written_revisions.push_back(rid); } void session::key_written_callback(rsa_keypair_id kid) { written_keys.push_back(kid); } void session::cert_written_callback(cert const & c) { written_certs.push_back(c); } id session::mk_nonce() { I(this->saved_nonce().size() == 0); char buf[constants::merkle_hash_length_in_bytes]; Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf), constants::merkle_hash_length_in_bytes); this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes); I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes); return this->saved_nonce; } void session::mark_recent_io() { last_io_time = ::time(NULL); } void session::set_session_key(string const & key) { session_key = netsync_session_key(key); read_hmac.set_key(session_key); write_hmac.set_key(session_key); } void session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted) { keypair our_kp; load_key_pair(app, app.signing_key, our_kp); string hmac_key; decrypt_rsa(app.lua, app.signing_key, our_kp.priv, hmac_key_encrypted, hmac_key); set_session_key(hmac_key); } void session::setup_client_tickers() { // xgettext: please use short message and try to avoid multibytes chars byte_in_ticker.reset(new ticker(_("bytes in"), ">", 1024, true)); // xgettext: please use short message and try to avoid multibytes chars byte_out_ticker.reset(new ticker(_("bytes out"), "<", 1024, true)); if (role == sink_role) { // xgettext: please use short message and try to avoid multibytes chars cert_in_ticker.reset(new ticker(_("certs in"), "c", 3)); // xgettext: please use short message and try to avoid multibytes chars revision_in_ticker.reset(new ticker(_("revs in"), "r", 1)); } else if (role == source_role) { // xgettext: please use short message and try to avoid multibytes chars cert_out_ticker.reset(new ticker(_("certs out"), "C", 3)); // xgettext: please use short message and try to avoid multibytes chars revision_out_ticker.reset(new ticker(_("revs out"), "R", 1)); } else { I(role == source_and_sink_role); // xgettext: please use short message and try to avoid multibytes chars revision_in_ticker.reset(new ticker(_("revs in"), "r", 1)); // xgettext: please use short message and try to avoid multibytes chars revision_out_ticker.reset(new ticker(_("revs out"), "R", 1)); } } bool session::done_all_refinements() { bool all = rev_refiner.done && cert_refiner.done && key_refiner.done && epoch_refiner.done; if (all && !set_totals) { if (cert_out_ticker.get()) cert_out_ticker->set_total(cert_refiner.items_to_send.size()); if (revision_out_ticker.get()) revision_out_ticker->set_total(rev_refiner.items_to_send.size()); if (cert_in_ticker.get()) cert_in_ticker->set_total(cert_refiner.items_to_receive); if (revision_in_ticker.get()) revision_in_ticker->set_total(rev_refiner.items_to_receive); set_totals = true; } return all; } bool session::received_all_items() { if (role == source_role) return true; bool all = rev_refiner.items_to_receive == 0 && cert_refiner.items_to_receive == 0 && key_refiner.items_to_receive == 0 && epoch_refiner.items_to_receive == 0; return all; } bool session::finished_working() { bool all = done_all_refinements() && received_all_items() && queued_all_items() && rev_enumerator.done(); return all; } bool session::queued_all_items() { if (role == sink_role) return true; bool all = rev_refiner.items_to_send.empty() && cert_refiner.items_to_send.empty() && key_refiner.items_to_send.empty() && epoch_refiner.items_to_send.empty(); return all; } void session::maybe_note_epochs_finished() { // Maybe there are outstanding epoch requests. if (!epoch_refiner.items_to_receive == 0) return; // And maybe we haven't even finished the refinement. if (!epoch_refiner.done) return; // If we ran into an error -- say a mismatched epoch -- don't do any // further refinements. if (encountered_error) return; // But otherwise, we're ready to go. Start the next // set of refinements. key_refiner.begin_refinement(); cert_refiner.begin_refinement(); rev_refiner.begin_refinement(); } static void decrement_if_nonzero(netcmd_item_type ty, size_t & n) { if (n == 0) { string typestr; netcmd_item_type_to_string(ty, typestr); E(false, F("underflow on count of %s items to receive") % typestr); } --n; } void session::note_item_arrived(netcmd_item_type ty, id const & ident) { switch (ty) { case cert_item: decrement_if_nonzero(ty, cert_refiner.items_to_receive); if (cert_in_ticker.get() != NULL) ++(*cert_in_ticker); break; case revision_item: decrement_if_nonzero(ty, rev_refiner.items_to_receive); if (revision_in_ticker.get() != NULL) ++(*revision_in_ticker); break; case key_item: decrement_if_nonzero(ty, key_refiner.items_to_receive); break; case epoch_item: decrement_if_nonzero(ty, epoch_refiner.items_to_receive); break; default: // No ticker for other things. break; } } void session::note_item_sent(netcmd_item_type ty, id const & ident) { switch (ty) { case cert_item: cert_refiner.items_to_send.erase(ident); if (cert_out_ticker.get() != NULL) ++(*cert_out_ticker); break; case revision_item: rev_refiner.items_to_send.erase(ident); if (revision_out_ticker.get() != NULL) ++(*revision_out_ticker); break; case key_item: key_refiner.items_to_send.erase(ident); break; case epoch_item: epoch_refiner.items_to_send.erase(ident); break; default: // No ticker for other things. break; } } void session::write_netcmd_and_try_flush(netcmd const & cmd) { if (!encountered_error) { string buf; cmd.write(buf, write_hmac); outbuf.push_back(make_pair(buf, 0)); outbuf_size += buf.size(); } else L(FL("dropping outgoing netcmd (because we're in error unwind mode)\n")); // FIXME: this helps keep the protocol pipeline full but it seems to // interfere with initial and final sequences. careful with it. // write_some(); // read_some(); } // This method triggers a special "error unwind" mode to netsync. In this // mode, all received data is ignored, and no new data is queued. We simply // stay connected long enough for the current write buffer to be flushed, to // ensure that our peer receives the error message. // Affects read_some, write_some, and process . void session::error(std::string const & errmsg) { throw netsync_error(errmsg); } Netxx::Probe::ready_type session::which_events() const { // Only ask to read if we're not armed. if (outbuf.empty()) { if (inbuf.size() < constants::netcmd_maxsz && !armed) return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd; else return Netxx::Probe::ready_oobd; } else { if (inbuf.size() < constants::netcmd_maxsz && !armed) return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd; else return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd; } } bool session::read_some() { I(inbuf.size() < constants::netcmd_maxsz); char tmp[constants::bufsz]; Netxx::signed_size_type count = str.read(tmp, sizeof(tmp)); if (count > 0) { L(FL("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id); if (encountered_error) { L(FL("in error unwind mode, so throwing them into the bit bucket\n")); return true; } inbuf.append(tmp,count); mark_recent_io(); if (byte_in_ticker.get() != NULL) (*byte_in_ticker) += count; return true; } else return false; } bool session::write_some() { I(!outbuf.empty()); size_t writelen = outbuf.front().first.size() - outbuf.front().second; Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second, std::min(writelen, constants::bufsz)); if (count > 0) { if ((size_t)count == writelen) { outbuf_size -= outbuf.front().first.size(); outbuf.pop_front(); } else { outbuf.front().second += count; } L(FL("wrote %d bytes to fd %d (peer %s)\n") % count % fd % peer_id); mark_recent_io(); if (byte_out_ticker.get() != NULL) (*byte_out_ticker) += count; if (encountered_error && outbuf.empty()) { // we've flushed our error message, so it's time to get out. L(FL("finished flushing output queue in error unwind mode, disconnecting\n")); return false; } return true; } else return false; } // senders void session::queue_error_cmd(string const & errmsg) { L(FL("queueing 'error' command\n")); netcmd cmd; cmd.write_error_cmd(errmsg); write_netcmd_and_try_flush(cmd); } void session::queue_bye_cmd(u8 phase) { L(FL("queueing 'bye' command, phase %d\n") % static_cast<size_t>(phase)); netcmd cmd; cmd.write_bye_cmd(phase); write_netcmd_and_try_flush(cmd); } void session::queue_done_cmd(netcmd_item_type type, size_t n_items) { string typestr; netcmd_item_type_to_string(type, typestr); L(FL("queueing 'done' command for %s (%d items)\n") % typestr % n_items); netcmd cmd; cmd.write_done_cmd(type, n_items); write_netcmd_and_try_flush(cmd); } void session::queue_hello_cmd(rsa_keypair_id const & key_name, base64<rsa_pub_key> const & pub_encoded, id const & nonce) { rsa_pub_key pub; decode_base64(pub_encoded, pub); cmd.write_hello_cmd(key_name, pub, nonce); write_netcmd_and_try_flush(cmd); } void session::queue_anonymous_cmd(protocol_role role, utf8 const & include_pattern, utf8 const & exclude_pattern, id const & nonce2, base64<rsa_pub_key> server_key_encoded) { netcmd cmd; rsa_oaep_sha_data hmac_key_encrypted; encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded, nonce2(), hmac_key_encrypted); cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern, hmac_key_encrypted); write_netcmd_and_try_flush(cmd); set_session_key(nonce2()); } void session::queue_auth_cmd(protocol_role role, utf8 const & include_pattern, utf8 const & exclude_pattern, id const & client, id const & nonce1, id const & nonce2, string const & signature, base64<rsa_pub_key> server_key_encoded) { netcmd cmd; rsa_oaep_sha_data hmac_key_encrypted; encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded, nonce2(), hmac_key_encrypted); cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client, nonce1, hmac_key_encrypted, signature); write_netcmd_and_try_flush(cmd); set_session_key(nonce2()); } void session::queue_confirm_cmd() { netcmd cmd; cmd.write_confirm_cmd(); write_netcmd_and_try_flush(cmd); } void session::queue_refine_cmd(refinement_type ty, merkle_node const & node) { string typestr; hexenc<prefix> hpref; node.get_hex_prefix(hpref); netcmd_item_type_to_string(node.type, typestr); L(FL("queueing refinement %s of %s node '%s', level %d\n") % (ty == refinement_query ? "query" : "response") % typestr % hpref % static_cast<int>(node.level)); netcmd cmd; cmd.write_refine_cmd(ty, node); write_netcmd_and_try_flush(cmd); } void session::queue_data_cmd(netcmd_item_type type, id const & item, string const & dat) { string typestr; netcmd_item_type_to_string(type, typestr); hexenc<id> hid; encode_hexenc(item, hid); if (role == sink_role) { L(FL("not queueing %s data for '%s' as we are in pure sink role\n") % typestr % hid); return; } L(FL("queueing %d bytes of data for %s item '%s'\n") % dat.size() % typestr % hid); netcmd cmd; // TODO: This pair of functions will make two copies of a large // file, the first in cmd.write_data_cmd, and the second in // write_netcmd_and_try_flush when the data is copied from the // cmd.payload variable to the string buffer for output. This // double copy should be collapsed out, it may be better to use // a string_queue for output as well as input, as that will reduce // the amount of mallocs that happen when the string queue is large // enough to just store the data. cmd.write_data_cmd(type, item, dat); write_netcmd_and_try_flush(cmd); note_item_sent(type, item); } void session::queue_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del) { I(type == file_item); I(! del().empty() || ident == base); string typestr; netcmd_item_type_to_string(type, typestr); hexenc<id> base_hid; encode_hexenc(base, base_hid); hexenc<id> ident_hid; encode_hexenc(ident, ident_hid); if (role == sink_role) { L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role\n") % typestr % base_hid % ident_hid); return; } L(FL("queueing %s delta '%s' -> '%s'\n") % typestr % base_hid % ident_hid); netcmd cmd; cmd.write_delta_cmd(type, base, ident, del); write_netcmd_and_try_flush(cmd); note_item_sent(type, ident); } // processors bool session::process_error_cmd(string const & errmsg) { throw bad_decode(F("received network error: %s") % errmsg); } void get_branches(app_state & app, vector<string> & names) { app.db.get_branches(names); sort(names.begin(), names.end()); } static const var_domain known_servers_domain = var_domain("known-servers"); bool session::process_hello_cmd(rsa_keypair_id const & their_keyname, rsa_pub_key const & their_key, id const & nonce) { I(this->remote_peer_key_hash().size() == 0); I(this->saved_nonce().size() == 0); hexenc<id> their_key_hash; base64<rsa_pub_key> their_key_encoded; encode_base64(their_key, their_key_encoded); key_hash_code(their_keyname, their_key_encoded, their_key_hash); L(FL("server key has name %s, hash %s\n") % their_keyname % their_key_hash); var_key their_key_key(known_servers_domain, var_name(peer_id)); if (app.db.var_exists(their_key_key)) { var_value expected_key_hash; app.db.get_var(their_key_key, expected_key_hash); if (expected_key_hash() != their_key_hash()) { P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n" "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n" "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n" "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n" "it is also possible that the server key has just been changed\n" "remote host sent key %s\n" "I expected %s\n" "'monotone unset %s %s' overrides this check\n") % their_key_hash % expected_key_hash % their_key_key.first % their_key_key.second); E(false, F("server key changed")); } } else { P(F("first time connecting to server %s\n" "I'll assume it's really them, but you might want to double-check\n" "their key's fingerprint: %s\n") % peer_id % their_key_hash); app.db.set_var(their_key_key, var_value(their_key_hash())); } if (!app.db.public_key_exists(their_key_hash)) { W(F("saving public key for %s to database\n") % their_keyname); app.db.put_key(their_keyname, their_key_encoded); } { hexenc<id> hnonce; encode_hexenc(nonce, hnonce); L(FL("received 'hello' netcmd from server '%s' with nonce '%s'\n") % their_key_hash % hnonce); } I(app.db.public_key_exists(their_key_hash)); // save their identity { id their_key_hash_decoded; decode_hexenc(their_key_hash, their_key_hash_decoded); this->remote_peer_key_hash = their_key_hash_decoded; } // clients always include in the synchronization set, every branch that the // user requested vector<string> branchnames; set<utf8> ok_branches; get_branches(app, branchnames); for (vector<string>::const_iterator i = branchnames.begin(); i != branchnames.end(); i++) { if (our_matcher(*i)) ok_branches.insert(utf8(*i)); } rebuild_merkle_trees(app, ok_branches); setup_client_tickers(); if (app.signing_key() != "") { // get our key pair keypair our_kp; load_key_pair(app, app.signing_key, our_kp); // get the hash identifier for our pubkey hexenc<id> our_key_hash; id our_key_hash_raw; key_hash_code(app.signing_key, our_kp.pub, our_key_hash); decode_hexenc(our_key_hash, our_key_hash_raw); // make a signature base64<rsa_sha1_signature> sig; rsa_sha1_signature sig_raw; make_signature(app, app.signing_key, our_kp.priv, nonce(), sig); decode_base64(sig, sig_raw); // make a new nonce of our own and send off the 'auth' queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern, our_key_hash_raw, nonce, mk_nonce(), sig_raw(), their_key_encoded); } else { queue_anonymous_cmd(this->role, our_include_pattern, our_exclude_pattern, mk_nonce(), their_key_encoded); } return true; } bool session::process_anonymous_cmd(protocol_role role, utf8 const & their_include_pattern, utf8 const & their_exclude_pattern) { // Internally netsync thinks in terms of sources and sinks. Users like // thinking of repositories as "readonly", "readwrite", or "writeonly". // // We therefore use the read/write terminology when dealing with the UI: // if the user asks to run a "read only" service, this means they are // willing to be a source but not a sink. // // nb: The "role" here is the role the *client* wants to play // so we need to check that the opposite role is allowed for us, // in our this->role field. // // Client must be a sink and server must be a source (anonymous // read-only). if (role != sink_role) { W(F("rejected attempt at anonymous connection for write\n")); this->saved_nonce = id(""); return false; } if (this->role != source_role && this->role != source_and_sink_role