Below is the file 'netsync.cc' from this revision. You can also download the file.

// Copyright (C) 2008 Stephen Leake <stephen_leake@stephe-leake.org>
// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
//
// This program is made available under the GNU GPL version 2.0 or
// greater. See the accompanying file COPYING for details.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the
// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
// PURPOSE.

#include "base.hh"
#include <map>
#include <cstdlib>
#include <memory>
#include <list>
#include <deque>
#include <stack>

#include <time.h>

#include "lexical_cast.hh"
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>

#include "lua_hooks.hh"
#include "key_store.hh"
#include "project.hh"
#include "database.hh"
#include "cert.hh"
#include "constants.hh"
#include "enumerator.hh"
#include "keys.hh"
#include "lua.hh"
#include "merkle_tree.hh"
#include "netcmd.hh"
#include "netio.hh"
#include "numeric_vocab.hh"
#include "refiner.hh"
#include "revision.hh"
#include "sanity.hh"
#include "transforms.hh"
#include "ui.hh"
#include "xdelta.hh"
#include "epoch.hh"
#include "platform.hh"
#include "hmac.hh"
#include "globish.hh"
#include "uri.hh"
#include "options.hh"

#include "botan/botan.h"

#include "netxx/address.h"
#include "netxx/peer.h"
#include "netxx/probe.h"
#include "netxx/socket.h"
#include "netxx/sockopt.h"
#include "netxx/stream.h"
#include "netxx/streamserver.h"
#include "netxx/timeout.h"
#include "netxx_pipe.hh"
// TODO: things to do that will break protocol compatibility
//   -- need some way to upgrade anonymous to keyed pull, without user having
//      to explicitly specify which they want
//      just having a way to respond "access denied, try again" might work
//      but perhaps better to have the anonymous command include a note "I
//      _could_ use key <...> if you prefer", and if that would lead to more
//      access, could reply "I do prefer".  (Does this lead to too much
//      information exposure?  Allows anonymous people to probe what branches
//      a key has access to.)
//   -- "warning" packet type?
//   -- Richard Levitte wants, when you (e.g.) request '*' but don't have
//      access to all of it, you just get the parts you have access to
//      (maybe with warnings about skipped branches).  to do this right,
//      should have a way for the server to send back to the client "right,
//      you're not getting the following branches: ...", so the client will
//      not include them in its merkle trie.
//   -- add some sort of vhost field to the client's first packet, saying who
//      they expect to talk to

//
// This is the "new" network synchronization (netsync) system in
// monotone. It is based on synchronizing pairs of merkle trees over an
// interactive connection.
//
// A netsync process between peers treats each peer as either a source, a
// sink, or both. When a peer is only a source, it will not write any new
// items to its database. when a peer is only a sink, it will not send any
// items from its database. When a peer is both a source and sink, it may
// send and write items freely.
//
// The post-state of a netsync is that each sink contains a superset of the
// items in its corresponding source; when peers are behaving as both
// source and sink, this means that the post-state of the sync is for the
// peers to have identical item sets.
//
//
// Data structure
// --------------
//
// Each node in a merkle tree contains a fixed number of slots. this number
// is derived from a global parameter of the protocol -- the tree fanout --
// such that the number of slots is 2^fanout. For now we will assume that
// fanout is 4 thus there are 16 slots in a node, because this makes
// illustration easier. The other parameter of the protocol is the size of
// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
//
// Each slot in a merkle tree node is in one of 3 states:
//
//   - empty
//   - leaf
//   - subtree
//
// In addition, each leaf contains a hash code which identifies an element
// of the set being synchronized. Each subtree slot contains a hash code of
// the node immediately beneath it in the merkle tree. Empty slots contain
// no hash codes.
//
// Since empty slots have no hash code, they are represented implicitly by
// a bitmap at the head of each merkle tree node. As an additional
// integrity check, each merkle tree node contains a label indicating its
// prefix in the tree, and a hash of its own contents.
//
// In total, then, the byte-level representation of a <160,4> merkle tree
// node is as follows:
//
//      20 bytes       - hash of the remaining bytes in the node
//       1 byte        - type of this node (manifest, file, key, mcert, fcert)
//     1-N bytes       - level of this node in the tree (0 == "root", uleb128)
//    0-20 bytes       - the prefix of this node, 4 bits * level,
//                       rounded up to a byte
//     1-N bytes       - number of leaves under this node (uleb128)
//       4 bytes       - slot-state bitmap of the node
//   0-320 bytes       - between 0 and 16 live slots in the node
//
// So, in the worst case such a node is 367 bytes, with these parameters.
//
//
// Protocol
// --------
//
// The protocol is a binary command-packet system over TCP; each packet
// consists of a single byte which identifies the protocol version, a byte
// which identifies the command name inside that version, a size_t sent as
// a uleb128 indicating the length of the packet, that many bytes of
// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
// 20-byte random key chosen by the client after authentication (discussed
// below). Decoding involves simply buffering until a sufficient number of
// bytes are received, then advancing the buffer pointer. Any time an
// integrity check (the HMAC) fails, the protocol is assumed to have lost
// synchronization, and the connection is dropped. The parties are free to
// drop the TCP stream at any point, if too much data is received or too
// much idle time passes; no commitments or transactions are made.
//
//
// Authentication and setup
// ------------------------
//
// The exchange begins in a non-authenticated state. The server sends a
// "hello <id> <nonce>" command, which identifies the server's RSA key and
// issues a nonce which must be used for a subsequent authentication.
//
// The client then responds with either:
//
// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
// role it wishes to play in the synchronization, identifies the pattern it
// wishes to sync with, signs the previous nonce with its own key, and informs
// the server of the HMAC key it wishes to use for this session (encrypted
// with the server's public key); or
//
// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
// <hmac key>" command, which identifies the role it wishes to play in the
// synchronization, the pattern it wishes to sync with, and the HMAC key it
// wishes to use for this session (also encrypted with the server's public
// key).
//
// The server then replies with a "confirm" command, which contains no
// other data but will only have the correct HMAC integrity code if the
// server received and properly decrypted the HMAC key offered by the
// client. This transitions the peers into an authenticated state and
// begins epoch refinement. If epoch refinement and epoch transmission
// succeed, the peers switch to data refinement and data transmission.
//
//
// Refinement
// ----------
//
// Refinement is executed by "refiners"; there is a refiner for each
// set of 'items' being exchanged: epochs, keys, certs, and revisions.
// When refinement starts, each party knows only their own set of
// items; when refinement completes, each party has learned of the
// complete set of items it needs to send, and a count of items it's
// expecting to receive.
//
// For more details on the refinement process, see refiner.cc.
//
//
// Transmission
// ------------
//
// Once the set of items to send has been determined (for keys, certs, and
// revisions) each peer switches into a transmission mode. This mode
// involves walking the revision graph in ancestry-order and sending all
// the items the local peer has which the remote one does not. Since the
// remote and local peers both know all the items which need to be
// transferred (they learned during refinement) they know what to wait for
// and what to send.  The mechanisms of the transmission phase (notably,
// enumerator.cc) simply ensure that things are sent in the proper order,
// and without over-filling the output buffer too much.
//
//
// Shutdown
// --------
//
// After transmission completes, one special command, "bye", is used to
// shut down a connection gracefully. The shutdown sequence based on "bye"
// commands is documented below in session::process_bye_cmd.
//
//
// Note on epochs
// --------------
//
// One refinement and transmission phase preceeds all the others: epochs.
// Epochs are exchanged and compared in order to be sure that further
// refinement and transmission (on certs and revisions) makes sense; they
// are a sort of "immune system" to prevent incompatible databases (say
// between rebuilds due to bugs in monotone) from cross-contaminating.  The
// later refinements are only kicked off *after* all epochs are received
// and compare correctly.
//
//
// Note on dense coding
// --------------------
//
// This protocol is "raw binary" (non-text) because coding density is
// actually important here, and each packet consists of very
// information-dense material that you wouldn't have a hope of typing in,
// or interpreting manually anyways.
//

using std::auto_ptr;
using std::deque;
using std::make_pair;
using std::map;
using std::min;
using std::pair;
using std::set;
using std::string;
using std::vector;

using boost::shared_ptr;
using boost::lexical_cast;

struct server_initiated_sync_request
{
  string what;
  string address;
  string include;
  string exclude;
};
deque<server_initiated_sync_request> server_initiated_sync_requests;
LUAEXT(server_request_sync, )
{
  char const * w = luaL_checkstring(L, 1);
  char const * a = luaL_checkstring(L, 2);
  char const * i = luaL_checkstring(L, 3);
  char const * e = luaL_checkstring(L, 4);
  server_initiated_sync_request request;
  request.what = string(w);
  request.address = string(a);
  request.include = string(i);
  request.exclude = string(e);
  server_initiated_sync_requests.push_back(request);
  return 0;
}

static inline void
require(bool check, string const & context)
{
  if (!check)
    throw bad_decode(F("check of '%s' failed") % context);
}

static void
read_pubkey(string const & in,
            rsa_keypair_id & id,
            rsa_pub_key & pub)
{
  string tmp_id, tmp_key;
  size_t pos = 0;
  extract_variable_length_string(in, tmp_id, pos, "pubkey id");
  extract_variable_length_string(in, tmp_key, pos, "pubkey value");
  id = rsa_keypair_id(tmp_id);
  pub = rsa_pub_key(tmp_key);
}

static void
write_pubkey(rsa_keypair_id const & id,
             rsa_pub_key const & pub,
             string & out)
{
  insert_variable_length_string(id(), out);
  insert_variable_length_string(pub(), out);
}

struct netsync_error
{
  string msg;
  netsync_error(string const & s): msg(s) {}
};

struct
session:
  public refiner_callbacks,
  public enumerator_callbacks
{
  protocol_role role;
  protocol_voice const voice;
  globish our_include_pattern;
  globish our_exclude_pattern;
  globish_matcher our_matcher;

  project_t & project;
  key_store & keys;
  lua_hooks & lua;
  bool use_transport_auth;
  rsa_keypair_id const & signing_key;
  vector<rsa_keypair_id> const & keys_to_push;

  string peer_id;
  shared_ptr<Netxx::StreamBase> str;

  string_queue inbuf;
  // deque of pair<string data, size_t cur_pos>
  deque< pair<string,size_t> > outbuf;
  // the total data stored in outbuf - this is
  // used as a valve to stop too much data
  // backing up
  size_t outbuf_size;

  netcmd cmd;
  bool armed;
  bool arm();

  bool received_remote_key;
  rsa_keypair_id remote_peer_key_name;
  netsync_session_key session_key;
  chained_hmac read_hmac;
  chained_hmac write_hmac;
  bool authenticated;

  time_t last_io_time;
  auto_ptr<ticker> byte_in_ticker;
  auto_ptr<ticker> byte_out_ticker;
  auto_ptr<ticker> cert_in_ticker;
  auto_ptr<ticker> cert_out_ticker;
  auto_ptr<ticker> revision_in_ticker;
  auto_ptr<ticker> revision_out_ticker;
  size_t bytes_in, bytes_out;
  size_t certs_in, certs_out;
  size_t revs_in, revs_out;
  size_t keys_in, keys_out;
  // used to identify this session to the netsync hooks.
  // We can't just use saved_nonce, because that's blank for all
  // anonymous connections and could lead to confusion.
  size_t session_id;
  static size_t session_count;

  // These are read from the server, written to the local database
  vector<revision_id> written_revisions;
  vector<rsa_keypair_id> written_keys;
  vector<cert> written_certs;

  // These are sent to the server
  vector<revision_id> sent_revisions;
  vector<rsa_keypair_id> sent_keys;
  vector<cert> sent_certs;

  id saved_nonce;

  enum
    {
      working_state,
      shutdown_state,
      confirmed_state
    }
    protocol_state;

  bool encountered_error;

  static const int no_error = 200;
  static const int partial_transfer = 211;
  static const int no_transfer = 212;

  static const int not_permitted = 412;
  static const int unknown_key = 422;
  static const int mixing_versions = 432;

  static const int role_mismatch = 512;
  static const int bad_command = 521;

  static const int failed_identification = 532;
  //static const int bad_data = 541;

  int error_code;

  bool set_totals;

  // Interface to refinement.
  refiner epoch_refiner;
  refiner key_refiner;
  refiner cert_refiner;
  refiner rev_refiner;

  // Interface to ancestry grovelling.
  revision_enumerator rev_enumerator;

  // Enumerator_callbacks methods.
  set<file_id> file_items_sent;
  bool process_this_rev(revision_id const & rev);
  bool queue_this_cert(id const & c);
  bool queue_this_file(id const & f);
  void note_file_data(file_id const & f);
  void note_file_delta(file_id const & src, file_id const & dst);
  void note_rev(revision_id const & rev);
  void note_cert(id const & c);

  session(options & opts,
          lua_hooks & lua,
          project_t & project,
          key_store & keys,
          protocol_role role,
          protocol_voice voice,
          globish const & our_include_pattern,
          globish const & our_exclude_pattern,
          string const & peer,
          shared_ptr<Netxx::StreamBase> sock,
          bool initiated_by_server = false);

  virtual ~session();

  id mk_nonce();
  void mark_recent_io();

  void set_session_key(string const & key);
  void set_session_key(rsa_oaep_sha_data const & key_encrypted);

  void setup_client_tickers();
  bool done_all_refinements();
  bool queued_all_items();
  bool received_all_items();
  bool finished_working();
  void maybe_step();
  void maybe_say_goodbye(transaction_guard & guard);

  void note_item_arrived(netcmd_item_type ty, id const & i);
  void maybe_note_epochs_finished();
  void note_item_sent(netcmd_item_type ty, id const & i);

  Netxx::Probe::ready_type which_events() const;
  bool read_some();
  bool write_some();

  void error(int errcode, string const & errmsg);

  void write_netcmd_and_try_flush(netcmd const & cmd);

  // Outgoing queue-writers.
  void queue_bye_cmd(u8 phase);
  void queue_error_cmd(string const & errmsg);
  void queue_done_cmd(netcmd_item_type type, size_t n_items);
  void queue_hello_cmd(rsa_keypair_id const & key_name,
                       rsa_pub_key const & pub_encoded,
                       id const & nonce);
  void queue_anonymous_cmd(protocol_role role,
                           globish const & include_pattern,
                           globish const & exclude_pattern,
                           id const & nonce2);
  void queue_auth_cmd(protocol_role role,
                      globish const & include_pattern,
                      globish const & exclude_pattern,
                      id const & client,
                      id const & nonce1,
                      id const & nonce2,
                      rsa_sha1_signature const & signature);
  void queue_confirm_cmd();
  void queue_refine_cmd(refinement_type ty, merkle_node const & node);
  void queue_data_cmd(netcmd_item_type type,
                      id const & item,
                      string const & dat);
  void queue_delta_cmd(netcmd_item_type type,
                       id const & base,
                       id const & ident,
                       delta const & del);

  // Incoming dispatch-called methods.
  bool process_error_cmd(string const & errmsg);
  bool process_hello_cmd(rsa_keypair_id const & server_keyname,
                         rsa_pub_key const & server_key,
                         id const & nonce);
  bool process_bye_cmd(u8 phase, transaction_guard & guard);
  bool process_anonymous_cmd(protocol_role role,
                             globish const & their_include_pattern,
                             globish const & their_exclude_pattern);
  bool process_auth_cmd(protocol_role role,
                        globish const & their_include_pattern,
                        globish const & their_exclude_pattern,
                        id const & client,
                        id const & nonce1,
                        rsa_sha1_signature const & signature);
  bool process_refine_cmd(refinement_type ty, merkle_node const & node);
  bool process_done_cmd(netcmd_item_type type, size_t n_items);
  bool process_data_cmd(netcmd_item_type type,
                        id const & item,
                        string const & dat);
  bool process_delta_cmd(netcmd_item_type type,
                         id const & base,
                         id const & ident,
                         delta const & del);
  bool process_usher_cmd(utf8 const & msg);

  // The incoming dispatcher.
  bool dispatch_payload(netcmd const & cmd,
                        transaction_guard & guard);

  // Various helpers.
  void assume_corresponding_role(protocol_role their_role);
  void respond_to_confirm_cmd();
  bool data_exists(netcmd_item_type type,
                   id const & item);
  void load_data(netcmd_item_type type,
                 id const & item,
                 string & out);

  void rebuild_merkle_trees(set<branch_name> const & branches);

  void send_all_data(netcmd_item_type ty, set<id> const & items);
  void begin_service();
  bool process(transaction_guard & guard);

  bool initiated_by_server;
};
size_t session::session_count = 0;

session::session(options & opts,
                 lua_hooks & lua,
                 project_t & project,
                 key_store & keys,
                 protocol_role role,
                 protocol_voice voice,
                 globish const & our_include_pattern,
                 globish const & our_exclude_pattern,
                 string const & peer,
                 shared_ptr<Netxx::StreamBase> sock,
                 bool initiated_by_server) :
  role(role),
  voice(voice),
  our_include_pattern(our_include_pattern),
  our_exclude_pattern(our_exclude_pattern),
  our_matcher(our_include_pattern, our_exclude_pattern),
  project(project),
  keys(keys),
  lua(lua),
  use_transport_auth(opts.use_transport_auth),
  signing_key(opts.signing_key),
  keys_to_push(opts.keys_to_push),
  peer_id(peer),
  str(sock),
  inbuf(),
  outbuf_size(0),
  armed(false),
  received_remote_key(false),
  remote_peer_key_name(""),
  session_key(constants::netsync_key_initializer),
  read_hmac(netsync_session_key(constants::netsync_key_initializer),
            use_transport_auth),
  write_hmac(netsync_session_key(constants::netsync_key_initializer),
             use_transport_auth),
  authenticated(false),
  last_io_time(::time(NULL)),
  byte_in_ticker(NULL),
  byte_out_ticker(NULL),
  cert_in_ticker(NULL),
  cert_out_ticker(NULL),
  revision_in_ticker(NULL),
  revision_out_ticker(NULL),
  bytes_in(0), bytes_out(0),
  certs_in(0), certs_out(0),
  revs_in(0), revs_out(0),
  keys_in(0), keys_out(0),
  session_id(++session_count),
  saved_nonce(""),
  protocol_state(working_state),
  encountered_error(false),
  error_code(no_transfer),
  set_totals(false),
  epoch_refiner(epoch_item, voice, *this),
  key_refiner(key_item, voice, *this),
  cert_refiner(cert_item, voice, *this),
  rev_refiner(revision_item, voice, *this),
  rev_enumerator(project, *this),
  initiated_by_server(initiated_by_server)
{}

session::~session()
{
  if (protocol_state == confirmed_state)
    error_code = no_error;
  else if (error_code == no_transfer &&
           (revs_in || revs_out ||
            certs_in || certs_out ||
            keys_in || keys_out))
    error_code = partial_transfer;

  vector<cert> unattached_written_certs;
  map<revision_id, vector<cert> > rev_written_certs;
  for (vector<revision_id>::iterator i = written_revisions.begin();
       i != written_revisions.end(); ++i)
    rev_written_certs.insert(make_pair(*i, vector<cert>()));
  for (vector<cert>::iterator i = written_certs.begin();
       i != written_certs.end(); ++i)
    {
      map<revision_id, vector<cert> >::iterator j;
      j = rev_written_certs.find(revision_id(i->ident));
      if (j == rev_written_certs.end())
        unattached_written_certs.push_back(*i);
      else
        j->second.push_back(*i);
    }

  if (!written_keys.empty()
      || !written_revisions.empty()
      || !written_certs.empty())
    {

      //Keys
      for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
           i != written_keys.end(); ++i)
        {
          lua.hook_note_netsync_pubkey_received(*i, session_id);
        }

      //Revisions
      for (vector<revision_id>::iterator i = written_revisions.begin();
           i != written_revisions.end(); ++i)
        {
          vector<cert> & ctmp(rev_written_certs[*i]);
          set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
          for (vector<cert>::const_iterator j = ctmp.begin();
               j != ctmp.end(); ++j)
            certs.insert(make_pair(j->key, make_pair(j->name, j->value)));

          revision_data rdat;
          project.db.get_revision(*i, rdat);
          lua.hook_note_netsync_revision_received(*i, rdat, certs,
                                                  session_id);
        }

      //Certs (not attached to a new revision)
      for (vector<cert>::iterator i = unattached_written_certs.begin();
           i != unattached_written_certs.end(); ++i)
        lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key,
                                            i->name, i->value, session_id);
    }

  if (!sent_keys.empty()
      || !sent_revisions.empty()
      || !sent_certs.empty())
    {

      vector<cert> unattached_sent_certs;
      map<revision_id, vector<cert> > rev_sent_certs;
      for (vector<revision_id>::iterator i = sent_revisions.begin();
           i != sent_revisions.end(); ++i)
        rev_sent_certs.insert(make_pair(*i, vector<cert>()));
      for (vector<cert>::iterator i = sent_certs.begin();
           i != sent_certs.end(); ++i)
        {
          map<revision_id, vector<cert> >::iterator j;
          j = rev_sent_certs.find(revision_id(i->ident));
          if (j == rev_sent_certs.end())
            unattached_sent_certs.push_back(*i);
          else
            j->second.push_back(*i);
        }

      //Keys
      for (vector<rsa_keypair_id>::iterator i = sent_keys.begin();
           i != sent_keys.end(); ++i)
        {
          lua.hook_note_netsync_pubkey_sent(*i, session_id);
        }

      //Revisions
      for (vector<revision_id>::iterator i = sent_revisions.begin();
           i != sent_revisions.end(); ++i)
        {
          vector<cert> & ctmp(rev_sent_certs[*i]);
          set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
          for (vector<cert>::const_iterator j = ctmp.begin();
               j != ctmp.end(); ++j)
            certs.insert(make_pair(j->key, make_pair(j->name, j->value)));

          revision_data rdat;
          project.db.get_revision(*i, rdat);
          lua.hook_note_netsync_revision_sent(*i, rdat, certs,
                                                  session_id);
        }

      //Certs (not attached to a new revision)
      for (vector<cert>::iterator i = unattached_sent_certs.begin();
           i != unattached_sent_certs.end(); ++i)
        lua.hook_note_netsync_cert_sent(revision_id(i->ident), i->key,
                                            i->name, i->value, session_id);
    }

  lua.hook_note_netsync_end(session_id, error_code,
                            bytes_in, bytes_out,
                            certs_in, certs_out,
                            revs_in, revs_out,
                            keys_in, keys_out);
}

bool
session::process_this_rev(revision_id const & rev)
{
  return (rev_refiner.items_to_send.find(rev.inner())
          != rev_refiner.items_to_send.end());
}

bool
session::queue_this_cert(id const & c)
{
  return (cert_refiner.items_to_send.find(c)
          != cert_refiner.items_to_send.end());
}

bool
session::queue_this_file(id const & f)
{
  return file_items_sent.find(file_id(f)) == file_items_sent.end();
}

void
session::note_file_data(file_id const & f)
{
  if (role == sink_role)
    return;
  file_data fd;
  project.db.get_file_version(f, fd);
  queue_data_cmd(file_item, f.inner(), fd.inner()());
  file_items_sent.insert(f);
}

void
session::note_file_delta(file_id const & src, file_id const & dst)
{
  if (role == sink_role)
    return;
  file_delta fdel;
  project.db.get_arbitrary_file_delta(src, dst, fdel);
  queue_delta_cmd(file_item, src.inner(), dst.inner(), fdel.inner());
  file_items_sent.insert(dst);
}

void
session::note_rev(revision_id const & rev)
{
  if (role == sink_role)
    return;
  revision_t rs;
  project.db.get_revision(rev, rs);
  data tmp;
  write_revision(rs, tmp);
  queue_data_cmd(revision_item, rev.inner(), tmp());
  sent_revisions.push_back(rev);
}

void
session::note_cert(id const & c)
{
  if (role == sink_role)
    return;
  revision<cert> cert;
  string str;
  project.db.get_revision_cert(c, cert);
  write_cert(cert.inner(), str);
  queue_data_cmd(cert_item, c, str);
  sent_certs.push_back(cert.inner());
}


id
session::mk_nonce()
{
  I(this->saved_nonce().size() == 0);
  char buf[constants::merkle_hash_length_in_bytes];
  Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
          constants::merkle_hash_length_in_bytes);
  this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes));
  I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
  return this->saved_nonce;
}

void
session::mark_recent_io()
{
  last_io_time = ::time(NULL);
}

void
session::set_session_key(string const & key)
{
  session_key = netsync_session_key(key);
  read_hmac.set_key(session_key);
  write_hmac.set_key(session_key);
}

void
session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
{
  if (use_transport_auth)
    {
      string hmac_key;
      keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key);
      set_session_key(hmac_key);
    }
}

void
session::setup_client_tickers()
{
  // xgettext: please use short message and try to avoid multibytes chars
  byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
  // xgettext: please use short message and try to avoid multibytes chars
  byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
  if (role == sink_role)
    {
      // xgettext: please use short message and try to avoid multibytes chars
      cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
    }
  else if (role == source_role)
    {
      // xgettext: please use short message and try to avoid multibytes chars
      cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
    }
  else
    {
      I(role == source_and_sink_role);
      // xgettext: please use short message and try to avoid multibytes chars
      cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3, false, true));
      // xgettext: please use short message and try to avoid multibytes chars
      cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3, false, true));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
    }
}

bool
session::done_all_refinements()
{
  bool all = rev_refiner.done
    && cert_refiner.done
    && key_refiner.done
    && epoch_refiner.done;

  if (all && !set_totals)
    {
      if (cert_out_ticker.get())
        cert_out_ticker->set_total(cert_refiner.items_to_send.size());

      if (revision_out_ticker.get())
        revision_out_ticker->set_total(rev_refiner.items_to_send.size());

      if (cert_in_ticker.get())
        cert_in_ticker->set_total(cert_refiner.items_to_receive);

      if (revision_in_ticker.get())
        revision_in_ticker->set_total(rev_refiner.items_to_receive);

      set_totals = true;
    }
  return all;
}



bool
session::received_all_items()
{
  if (role == source_role)
    return true;
  bool all = rev_refiner.items_to_receive == 0
    && cert_refiner.items_to_receive == 0
    && key_refiner.items_to_receive == 0
    && epoch_refiner.items_to_receive == 0;
  return all;
}

bool
session::finished_working()
{
  bool all = done_all_refinements()
    && received_all_items()
    && queued_all_items()
    && rev_enumerator.done();
  return all;
}

bool
session::queued_all_items()
{
  if (role == sink_role)
    return true;
  bool all = rev_refiner.items_to_send.empty()
    && cert_refiner.items_to_send.empty()
    && key_refiner.items_to_send.empty()
    && epoch_refiner.items_to_send.empty();
  return all;
}


void
session::maybe_note_epochs_finished()
{
  // Maybe there are outstanding epoch requests.
  // These only matter if we're in sink or source-and-sink mode.
  if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
    return;

  // And maybe we haven't even finished the refinement.
  if (!epoch_refiner.done)
    return;

  // If we ran into an error -- say a mismatched epoch -- don't do any
  // further refinements.
  if (encountered_error)
    return;

  // But otherwise, we're ready to go. Start the next
  // set of refinements.
  if (voice == client_voice)
    {
      L(FL("epoch refinement finished; beginning other refinements"));
      key_refiner.begin_refinement();
      cert_refiner.begin_refinement();
      rev_refiner.begin_refinement();
    }
  else
    L(FL("epoch refinement finished"));
}

static void
decrement_if_nonzero(netcmd_item_type ty,
                     size_t & n)
{
  if (n == 0)
    {
      string typestr;
      netcmd_item_type_to_string(ty, typestr);
      E(false, F("underflow on count of %s items to receive") % typestr);
    }
  --n;
  if (n == 0)
    {
      string typestr;
      netcmd_item_type_to_string(ty, typestr);
      L(FL("count of %s items to receive has reached zero") % typestr);
    }
}

void
session::note_item_arrived(netcmd_item_type ty, id const & ident)
{
  switch (ty)
    {
    case cert_item:
      decrement_if_nonzero(ty, cert_refiner.items_to_receive);
      if (cert_in_ticker.get() != NULL)
        ++(*cert_in_ticker);
      ++certs_in;
      break;
    case revision_item:
      decrement_if_nonzero(ty, rev_refiner.items_to_receive);
      if (revision_in_ticker.get() != NULL)
        ++(*revision_in_ticker);
      ++revs_in;
      break;
    case key_item:
      decrement_if_nonzero(ty, key_refiner.items_to_receive);
      ++keys_in;
      break;
    case epoch_item:
      decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
      break;
    default:
      // No ticker for other things.
      break;
    }
}



void
session::note_item_sent(netcmd_item_type ty, id const & ident)
{
  switch (ty)
    {
    case cert_item:
      cert_refiner.items_to_send.erase(ident);
      if (cert_out_ticker.get() != NULL)
        ++(*cert_out_ticker);
      ++certs_out;
      break;
    case revision_item:
      rev_refiner.items_to_send.erase(ident);
      if (revision_out_ticker.get() != NULL)
        ++(*revision_out_ticker);
      ++revs_out;
      break;
    case key_item:
      key_refiner.items_to_send.erase(ident);
      ++keys_out;
      break;
    case epoch_item:
      epoch_refiner.items_to_send.erase(ident);
      break;
    default:
      // No ticker for other things.
      break;
    }
}

void
session::write_netcmd_and_try_flush(netcmd const & cmd)
{
  if (!encountered_error)
  {
    string buf;
    cmd.write(buf, write_hmac);
    outbuf.push_back(make_pair(buf, 0));
    outbuf_size += buf.size();
  }
  else
    L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
  // FIXME: this helps keep the protocol pipeline full but it seems to
  // interfere with initial and final sequences. careful with it.
  // write_some();
  // read_some();
}

// This method triggers a special "error unwind" mode to netsync.  In this
// mode, all received data is ignored, and no new data is queued.  We simply
// stay connected long enough for the current write buffer to be flushed, to
// ensure that our peer receives the error message.
// Affects read_some, write_some, and process .
void
session::error(int errcode, string const & errmsg)
{
  error_code = errcode;
  throw netsync_error(errmsg);
}

Netxx::Probe::ready_type
session::which_events() const
{
  // Only ask to read if we're not armed.
  if (outbuf.empty())
    {
      if (inbuf.size() < constants::netcmd_maxsz && !armed)
        return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
      else
        return Netxx::Probe::ready_oobd;
    }
  else
    {
      if (inbuf.size() < constants::netcmd_maxsz && !armed)
        return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
      else
        return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
    }
}

bool
session::read_some()
{
  I(inbuf.size() < constants::netcmd_maxsz);
  char tmp[constants::bufsz];
  Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
  if (count > 0)
    {
      L(FL("read %d bytes from fd %d (peer %s)")
        % count % str->get_socketfd() % peer_id);
      if (encountered_error)
        {
          L(FL("in error unwind mode, so throwing them into the bit bucket"));
          return true;
        }
      inbuf.append(tmp,count);
      mark_recent_io();
      if (byte_in_ticker.get() != NULL)
        (*byte_in_ticker) += count;
      bytes_in += count;
      return true;
    }
  else
    return false;
}

bool
session::write_some()
{
  I(!outbuf.empty());
  size_t writelen = outbuf.front().first.size() - outbuf.front().second;
  Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
                                            min(writelen,
                                            constants::bufsz));
  if (count > 0)
    {
      if ((size_t)count == writelen)
        {
          outbuf_size -= outbuf.front().first.size();
          outbuf.pop_front();
        }
      else
        {
          outbuf.front().second += count;
        }
      L(FL("wrote %d bytes to fd %d (peer %s)")
        % count % str->get_socketfd() % peer_id);
      mark_recent_io();
      if (byte_out_ticker.get() != NULL)
        (*byte_out_ticker) += count;
      bytes_out += count;
      if (encountered_error && outbuf.empty())
        {
          // we've flushed our error message, so it's time to get out.
          L(FL("finished flushing output queue in error unwind mode, disconnecting"));
          return false;
        }
      return true;
    }
  else
    return false;
}

// senders

void
session::queue_error_cmd(string const & errmsg)
{
  L(FL("queueing 'error' command"));
  netcmd cmd;
  cmd.write_error_cmd(errmsg);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_bye_cmd(u8 phase)
{
  L(FL("queueing 'bye' command, phase %d")
    % static_cast<size_t>(phase));
  netcmd cmd;
  cmd.write_bye_cmd(phase);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_done_cmd(netcmd_item_type type,
                        size_t n_items)
{
  string typestr;
  netcmd_item_type_to_string(type, typestr);
  L(FL("queueing 'done' command for %s (%d items)")
    % typestr % n_items);
  netcmd cmd;
  cmd.write_done_cmd(type, n_items);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_hello_cmd(rsa_keypair_id const & key_name,
                         rsa_pub_key const & pub,
                         id const & nonce)
{
  if (use_transport_auth)
    cmd.write_hello_cmd(key_name, pub, nonce);
  else
    cmd.write_hello_cmd(key_name, rsa_pub_key(), nonce);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_anonymous_cmd(protocol_role role,
                             globish const & include_pattern,
                             globish const & exclude_pattern,
                             id const & nonce2)
{
  netcmd cmd;
  rsa_oaep_sha_data hmac_key_encrypted;
  if (use_transport_auth)
    project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted);
  cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
                          hmac_key_encrypted);
  write_netcmd_and_try_flush(cmd);
  set_session_key(nonce2());
}

void
session::queue_auth_cmd(protocol_role role,
                        globish const & include_pattern,
                        globish const & exclude_pattern,
                        id const & client,
                        id const & nonce1,
                        id const & nonce2,
                        rsa_sha1_signature const & signature)
{
  netcmd cmd;
  rsa_oaep_sha_data hmac_key_encrypted;
  I(use_transport_auth);
  project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted);
  cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
                     nonce1, hmac_key_encrypted, signature);
  write_netcmd_and_try_flush(cmd);
  set_session_key(nonce2());
}

void
session::queue_confirm_cmd()
{
  netcmd cmd;
  cmd.write_confirm_cmd();
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
{
  string typestr;
  hexenc<prefix> hpref;
  node.get_hex_prefix(hpref);
  netcmd_item_type_to_string(node.type, typestr);
  L(FL("queueing refinement %s of %s node '%s', level %d")
    % (ty == refinement_query ? "query" : "response")
    % typestr % hpref() % static_cast<int>(node.level));
  netcmd cmd;
  cmd.write_refine_cmd(ty, node);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_data_cmd(netcmd_item_type type,
                        id const & item,
                        string const & dat)
{
  string typestr;
  netcmd_item_type_to_string(type, typestr);
  hexenc<id> hid;

  if (global_sanity.debug_p())
    encode_hexenc(item, hid);

  if (role == sink_role)
    {
      L(FL("not queueing %s data for '%s' as we are in pure sink role")
        % typestr % hid());
      return;
    }

  L(FL("queueing %d bytes of data for %s item '%s'")
    % dat.size() % typestr % hid());

  netcmd cmd;
  // TODO: This pair of functions will make two copies of a large
  // file, the first in cmd.write_data_cmd, and the second in
  // write_netcmd_and_try_flush when the data is copied from the
  // cmd.payload variable to the string buffer for output.  This
  // double copy should be collapsed out, it may be better to use
  // a string_queue for output as well as input, as that will reduce
  // the amount of mallocs that happen when the string queue is large
  // enough to just store the data.
  cmd.write_data_cmd(type, item, dat);
  write_netcmd_and_try_flush(cmd);
  note_item_sent(type, item);
}

void
session::queue_delta_cmd(netcmd_item_type type