Below is the file 'netsync.cc' from this revision. You can also download the file.

// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil; c-basic-offset: 2 -*-
// copyright (C) 2004 graydon hoare <graydon@pobox.com>
// all rights reserved.
// licensed to the public under the terms of the GNU GPL (>= 2)
// see the file COPYING for details

#include <map>
#include <string>
#include <memory>
#include <list>
#include <deque>
#include <stack>

#include <time.h>

#include <boost/lexical_cast.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/regex.hpp>

#include "app_state.hh"
#include "cert.hh"
#include "constants.hh"
#include "enumerator.hh"
#include "keys.hh"
#include "merkle_tree.hh"
#include "netcmd.hh"
#include "netio.hh"
#include "netsync.hh"
#include "numeric_vocab.hh"
#include "packet.hh"
#include "refiner.hh"
#include "sanity.hh"
#include "transforms.hh"
#include "ui.hh"
#include "xdelta.hh"
#include "epoch.hh"
#include "platform.hh"
#include "hmac.hh"
#include "globish.hh"

#include "botan/botan.h"

#include "netxx/address.h"
#include "netxx/peer.h"
#include "netxx/probe.h"
#include "netxx/socket.h"
#include "netxx/sockopt.h"
#include "netxx/stream.h"
#include "netxx/streamserver.h"
#include "netxx/timeout.h"

// TODO: things to do that will break protocol compatibility
//   -- need some way to upgrade anonymous to keyed pull, without user having
//      to explicitly specify which they want
//      just having a way to respond "access denied, try again" might work
//      but perhaps better to have the anonymous command include a note "I
//      _could_ use key <...> if you prefer", and if that would lead to more
//      access, could reply "I do prefer".  (Does this lead to too much
//      information exposure?  Allows anonymous people to probe what branches
//      a key has access to.)
//   -- "warning" packet type?
//   -- Richard Levitte wants, when you (e.g.) request '*' but don't access to
//      all of it, you just get the parts you have access to (maybe with
//      warnings about skipped branches).  to do this right, should have a way
//      for the server to send back to the client "right, you're not getting
//      the following branches: ...", so the client will not include them in
//      its merkle trie.
//   -- add some sort of vhost field to the client's first packet, saying who
//      they expect to talk to
//   -- apparently we have a IANA approved port: 4691.  I guess we should
//      switch to using that.

//
// This is the "new" network synchronization (netsync) system in
// monotone. It is based on synchronizing pairs of merkle trees over an
// interactive connection.
//
// A netsync process between peers treats each peer as either a source, a
// sink, or both. When a peer is only a source, it will not write any new
// items to its database. when a peer is only a sink, it will not send any
// items from its database. When a peer is both a source and sink, it may
// send and write items freely.
//
// The post-state of a netsync is that each sink contains a superset of the
// items in its corresponding source; when peers are behaving as both
// source and sink, this means that the post-state of the sync is for the
// peers to have identical item sets.
//
//
// Data structure
// --------------
//
// Each node in a merkle tree contains a fixed number of slots. this number
// is derived from a global parameter of the protocol -- the tree fanout --
// such that the number of slots is 2^fanout. For now we will assume that
// fanout is 4 thus there are 16 slots in a node, because this makes
// illustration easier. The other parameter of the protocol is the size of
// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
//
// Each slot in a merkle tree node is in one of 3 states:
//
//   - empty
//   - leaf
//   - subtree
//
// In addition, each leaf contains a hash code which identifies an element
// of the set being synchronized. Each subtree slot contains a hash code of
// the node immediately beneath it in the merkle tree. Empty slots contain
// no hash codes.
//
// Since empty slots have no hash code, they are represented implicitly by
// a bitmap at the head of each merkle tree node. As an additional
// integrity check, each merkle tree node contains a label indicating its
// prefix in the tree, and a hash of its own contents.
//
// In total, then, the byte-level representation of a <160,4> merkle tree
// node is as follows:
//
//      20 bytes       - hash of the remaining bytes in the node
//       1 byte        - type of this node (manifest, file, key, mcert, fcert)
//     1-N bytes       - level of this node in the tree (0 == "root", uleb128)
//    0-20 bytes       - the prefix of this node, 4 bits * level,
//                       rounded up to a byte
//     1-N bytes       - number of leaves under this node (uleb128)
//       4 bytes       - slot-state bitmap of the node
//   0-320 bytes       - between 0 and 16 live slots in the node
//
// So, in the worst case such a node is 367 bytes, with these parameters.
//
//
// Protocol
// --------
//
// The protocol is a binary command-packet system over TCP; each packet
// consists of a single byte which identifies the protocol version, a byte
// which identifies the command name inside that version, a size_t sent as
// a uleb128 indicating the length of the packet, that many bytes of
// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
// 20-byte random key chosen by the client after authentication (discussed
// below). Decoding involves simply buffering until a sufficient number of
// bytes are received, then advancing the buffer pointer. Any time an
// integrity check (the HMAC) fails, the protocol is assumed to have lost
// synchronization, and the connection is dropped. The parties are free to
// drop the TCP stream at any point, if too much data is received or too
// much idle time passes; no commitments or transactions are made.
//
//
// Authentication and setup
// ------------------------
//
// The exchange begins in a non-authenticated state. The server sends a
// "hello <id> <nonce>" command, which identifies the server's RSA key and
// issues a nonce which must be used for a subsequent authentication.
//
// The client then responds with either:
//
// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
// role it wishes to play in the synchronization, identifies the pattern it
// wishes to sync with, signs the previous nonce with its own key, and informs
// the server of the HMAC key it wishes to use for this session (encrypted
// with the server's public key); or
//
// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
// <hmac key>" command, which identifies the role it wishes to play in the
// synchronization, the pattern it ishes to sync with, and the HMAC key it
// wishes to use for this session (also encrypted with the server's public
// key).
//
// The server then replies with a "confirm" command, which contains no
// other data but will only have the correct HMAC integrity code if the
// server received and properly decrypted the HMAC key offered by the
// client. This transitions the peers into an authenticated state and
// begins epoch refinement. If epoch refinement and epoch transmission
// succeed, the peers switch to data refinement and data transmission.
//
//
// Refinement
// ----------
//
// Refinement is executed by "refiners"; there is a refiner for each
// set of 'items' being exchanged: epochs, keys, certs, and revisions.
// When refinement starts, each party knows only their own set of
// items; when refinement completes, each party has learned of the
// complete set of items it needs to send, and a count of items it's
// expecting to receive.
//
// For more details on the refinement process, see refiner.cc.
//
//
// Transmission
// ------------
//
// Once the set of items to send has been determined (for keys, certs, and
// revisions) each peer switches into a transmission mode. This mode
// involves walking the revision graph in ancestry-order and sending all
// the items the local peer has which the remote one does not. Since the
// remote and local peers both know all the items which need to be
// transferred (they learned during refinement) they know what to wait for
// and what to send.  The mechanisms of the transmission phase (notably,
// enumerator.cc) simply ensure that things are sent in the proper order,
// and without over-filling the output buffer too much.
//
//
// Shutdown
// --------
//
// After transmission completes, one special command, "bye", is used to
// shut down a connection gracefully. The shutdown sequence based on "bye"
// commands is documented below in session::process_bye_cmd.
//
//
// Note on epochs
// --------------
//
// One refinement and transmission phase preceeds all the others: epochs.
// Epochs are exchanged and compared in order to be sure that further
// refinement and transmission (on certs and revisions) makes sense; they
// are a sort of "immune system" to prevent incompatible databases (say
// between rebuilds due to bugs in monotone) from cross-contaminating.  The
// later refinements are only kicked off *after* all epochs are received
// and compare correctly.
//
//
// Note on dense coding
// --------------------
//
// This protocol is "raw binary" (non-text) because coding density is
// actually important here, and each packet consists of very
// information-dense material that you wouldn't have a hope of typing in,
// interpreting manually anyways.
//

using namespace std;
using boost::shared_ptr;
using boost::lexical_cast;

static inline void
require(bool check, string const & context)
{
  if (!check)
    throw bad_decode(F("check of '%s' failed") % context);
}

struct netsync_error
{
  string msg;
  netsync_error(string const & s): msg(s) {}
};

struct
session:
  public refiner_callbacks,
  public enumerator_callbacks
{
  protocol_role role;
  protocol_voice const voice;
  utf8 const & our_include_pattern;
  utf8 const & our_exclude_pattern;
  globish_matcher our_matcher;
  app_state & app;

  string peer_id;
  Netxx::socket_type fd;
  Netxx::Stream str;

  string_queue inbuf;
  // deque of pair<string data, size_t cur_pos>
  deque< pair<string,size_t> > outbuf;
  // the total data stored in outbuf - this is
  // used as a valve to stop too much data
  // backing up
  size_t outbuf_size;

  netcmd cmd;
  bool armed;
  bool arm();

  id remote_peer_key_hash;
  rsa_keypair_id remote_peer_key_name;
  netsync_session_key session_key;
  chained_hmac read_hmac;
  chained_hmac write_hmac;
  bool authenticated;

  time_t last_io_time;
  auto_ptr<ticker> byte_in_ticker;
  auto_ptr<ticker> byte_out_ticker;
  auto_ptr<ticker> cert_in_ticker;
  auto_ptr<ticker> cert_out_ticker;
  auto_ptr<ticker> revision_in_ticker;
  auto_ptr<ticker> revision_out_ticker;

  vector<revision_id> written_revisions;
  vector<rsa_keypair_id> written_keys;
  vector<cert> written_certs;

  id saved_nonce;
  packet_db_writer dbw;

  enum
    {
      working_state,
      shutdown_state,
      confirmed_state
    }
    protocol_state;
  bool encountered_error;
  bool set_totals;

  // Interface to refinement.
  refiner epoch_refiner;
  refiner key_refiner;
  refiner cert_refiner;
  refiner rev_refiner;

  // Interface to ancestry grovelling.
  revision_enumerator rev_enumerator;

  // Enumerator_callbacks methods.
  set<file_id> file_items_sent;
  bool process_this_rev(revision_id const & rev);
  bool queue_this_cert(hexenc<id> const & c);
  bool queue_this_file(hexenc<id> const & f);
  void note_file_data(file_id const & f);
  void note_file_delta(file_id const & src, file_id const & dst);
  void note_rev(revision_id const & rev);
  void note_cert(hexenc<id> const & c);

  session(protocol_role role,
          protocol_voice voice,
          utf8 const & our_include_pattern,
          utf8 const & our_exclude_pattern,
          app_state & app,
          string const & peer,
          Netxx::socket_type sock,
          Netxx::Timeout const & to);

  virtual ~session();

  void rev_written_callback(revision_id rid);
  void key_written_callback(rsa_keypair_id kid);
  void cert_written_callback(cert const & c);

  id mk_nonce();
  void mark_recent_io();

  void set_session_key(string const & key);
  void set_session_key(rsa_oaep_sha_data const & key_encrypted);

  void setup_client_tickers();
  bool done_all_refinements();
  bool queued_all_items();
  bool received_all_items();
  bool finished_working();
  void maybe_step();
  void maybe_say_goodbye(transaction_guard & guard);

  void note_item_arrived(netcmd_item_type ty, id const & i);
  void maybe_note_epochs_finished();
  void note_item_sent(netcmd_item_type ty, id const & i);

  Netxx::Probe::ready_type which_events() const;
  bool read_some();
  bool write_some();

  void error(string const & errmsg);

  void write_netcmd_and_try_flush(netcmd const & cmd);

  // Outgoing queue-writers.
  void queue_bye_cmd(u8 phase);
  void queue_error_cmd(string const & errmsg);
  void queue_done_cmd(netcmd_item_type type, size_t n_items);
  void queue_hello_cmd(rsa_keypair_id const & key_name,
                       base64<rsa_pub_key> const & pub_encoded,
                       id const & nonce);
  void queue_anonymous_cmd(protocol_role role,
                           utf8 const & include_pattern,
                           utf8 const & exclude_pattern,
                           id const & nonce2,
                           base64<rsa_pub_key> server_key_encoded);
  void queue_auth_cmd(protocol_role role,
                      utf8 const & include_pattern,
                      utf8 const & exclude_pattern,
                      id const & client,
                      id const & nonce1,
                      id const & nonce2,
                      string const & signature,
                      base64<rsa_pub_key> server_key_encoded);
  void queue_confirm_cmd();
  void queue_refine_cmd(refinement_type ty, merkle_node const & node);
  void queue_data_cmd(netcmd_item_type type,
                      id const & item,
                      string const & dat);
  void queue_delta_cmd(netcmd_item_type type,
                       id const & base,
                       id const & ident,
                       delta const & del);

  // Incoming dispatch-called methods.
  bool process_error_cmd(string const & errmsg);
  bool process_hello_cmd(rsa_keypair_id const & server_keyname,
                         rsa_pub_key const & server_key,
                         id const & nonce);
  bool process_bye_cmd(u8 phase, transaction_guard & guard);
  bool process_anonymous_cmd(protocol_role role,
                             utf8 const & their_include_pattern,
                             utf8 const & their_exclude_pattern);
  bool process_auth_cmd(protocol_role role,
                        utf8 const & their_include_pattern,
                        utf8 const & their_exclude_pattern,
                        id const & client,
                        id const & nonce1,
                        string const & signature);
  bool process_refine_cmd(refinement_type ty, merkle_node const & node);
  bool process_done_cmd(netcmd_item_type type, size_t n_items);
  bool process_data_cmd(netcmd_item_type type,
                        id const & item,
                        string const & dat);
  bool process_delta_cmd(netcmd_item_type type,
                         id const & base,
                         id const & ident,
                         delta const & del);
  bool process_usher_cmd(utf8 const & msg);

  // The incoming dispatcher.
  bool dispatch_payload(netcmd const & cmd,
                        transaction_guard & guard);

  // Various helpers.
  void respond_to_confirm_cmd();
  void rebuild_merkle_trees(app_state & app,
                            set<utf8> const & branches);

  void send_all_data(netcmd_item_type ty, set<id> const & items);
  void begin_service();
  bool process(transaction_guard & guard);
};


session::session(protocol_role role,
                 protocol_voice voice,
                 utf8 const & our_include_pattern,
                 utf8 const & our_exclude_pattern,
                 app_state & app,
                 string const & peer,
                 Netxx::socket_type sock,
                 Netxx::Timeout const & to) :
  role(role),
  voice(voice),
  our_include_pattern(our_include_pattern),
  our_exclude_pattern(our_exclude_pattern),
  our_matcher(our_include_pattern, our_exclude_pattern),
  app(app),
  peer_id(peer),
  fd(sock),
  str(sock, to),
  inbuf(),
  outbuf_size(0),
  armed(false),
  remote_peer_key_hash(""),
  remote_peer_key_name(""),
  session_key(constants::netsync_key_initializer),
  read_hmac(constants::netsync_key_initializer),
  write_hmac(constants::netsync_key_initializer),
  authenticated(false),
  last_io_time(::time(NULL)),
  byte_in_ticker(NULL),
  byte_out_ticker(NULL),
  cert_in_ticker(NULL),
  cert_out_ticker(NULL),
  revision_in_ticker(NULL),
  revision_out_ticker(NULL),
  saved_nonce(""),
  dbw(app),
  protocol_state(working_state),
  encountered_error(false),
  set_totals(false),
  epoch_refiner(epoch_item, voice, *this),
  key_refiner(key_item, voice, *this),
  cert_refiner(cert_item, voice, *this),
  rev_refiner(revision_item, voice, *this),
  rev_enumerator(*this, app)
{
  dbw.set_on_revision_written(boost::bind(&session::rev_written_callback,
                                          this, _1));
  dbw.set_on_cert_written(boost::bind(&session::cert_written_callback,
                                      this, _1));
  dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback,
                                        this, _1));
}

session::~session()
{
  vector<cert> unattached_certs;
  map<revision_id, vector<cert> > revcerts;
  for (vector<revision_id>::iterator i = written_revisions.begin();
       i != written_revisions.end(); ++i)
    revcerts.insert(make_pair(*i, vector<cert>()));
  for (vector<cert>::iterator i = written_certs.begin();
       i != written_certs.end(); ++i)
    {
      map<revision_id, vector<cert> >::iterator j;
      j = revcerts.find(i->ident);
      if (j == revcerts.end())
        unattached_certs.push_back(*i);
      else
        j->second.push_back(*i);
    }

  //Keys
  for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
       i != written_keys.end(); ++i)
    {
      app.lua.hook_note_netsync_pubkey_received(*i);
    }

  //Revisions
  for (vector<revision_id>::iterator i = written_revisions.begin();
      i != written_revisions.end(); ++i)
    {
      vector<cert> & ctmp(revcerts[*i]);
      set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
      for (vector<cert>::const_iterator j = ctmp.begin();
           j != ctmp.end(); ++j)
        {
          cert_value vtmp;
          decode_base64(j->value, vtmp);
          certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
        }
      revision_data rdat;
      app.db.get_revision(*i, rdat);
      app.lua.hook_note_netsync_revision_received(*i, rdat, certs);
    }

  //Certs (not attached to a new revision)
  for (vector<cert>::iterator i = unattached_certs.begin();
      i != unattached_certs.end(); ++i)
    {
      cert_value tmp;
      decode_base64(i->value, tmp);
      app.lua.hook_note_netsync_cert_received(i->ident, i->key,
                                              i->name, tmp);

    }
}

bool
session::process_this_rev(revision_id const & rev)
{
  id item;
  decode_hexenc(rev.inner(), item);
  return (rev_refiner.items_to_send.find(item)
          != rev_refiner.items_to_send.end());
}

bool
session::queue_this_cert(hexenc<id> const & c)
{
  id item;
  decode_hexenc(c, item);
  return (cert_refiner.items_to_send.find(item)
          != cert_refiner.items_to_send.end());
}

bool
session::queue_this_file(hexenc<id> const & f)
{
  return file_items_sent.find(f) == file_items_sent.end();
}

void
session::note_file_data(file_id const & f)
{
  if (role == sink_role)
    return;
  file_data fd;
  id item;
  decode_hexenc(f.inner(), item);
  app.db.get_file_version(f, fd);
  queue_data_cmd(file_item, item, fd.inner()());
  file_items_sent.insert(f);
}

void
session::note_file_delta(file_id const & src, file_id const & dst)
{
  if (role == sink_role)
    return;
  file_delta del;
  app.db.get_file_delta(dst, src, del);
  id src_id, dst_id;
  decode_hexenc(src.inner(), src_id);
  decode_hexenc(dst.inner(), dst_id);
  queue_delta_cmd(file_item, src_id, dst_id, del.inner());
  file_items_sent.insert(dst);
}

void
session::note_rev(revision_id const & rev)
{
  if (role == sink_role)
    return;
  revision_set rs;
  id item;
  decode_hexenc(rev.inner(), item);
  app.db.get_revision(rev, rs);
  data tmp;
  write_revision_set(rs, tmp);
  queue_data_cmd(revision_item, item, tmp());
}

void
session::note_cert(hexenc<id> const & c)
{
  if (role == sink_role)
    return;
  id item;
  decode_hexenc(c, item);
  revision<cert> cert;
  string str;
  app.db.get_revision_cert(c, cert);
  write_cert(cert.inner(), str);
  queue_data_cmd(cert_item, item, str);
}


void session::rev_written_callback(revision_id rid)
{
  written_revisions.push_back(rid);
}

void session::key_written_callback(rsa_keypair_id kid)
{
  written_keys.push_back(kid);
}

void session::cert_written_callback(cert const & c)
{
  written_certs.push_back(c);
}

id
session::mk_nonce()
{
  I(this->saved_nonce().size() == 0);
  char buf[constants::merkle_hash_length_in_bytes];
  Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
          constants::merkle_hash_length_in_bytes);
  this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes);
  I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
  return this->saved_nonce;
}

void
session::mark_recent_io()
{
  last_io_time = ::time(NULL);
}

void
session::set_session_key(string const & key)
{
  session_key = netsync_session_key(key);
  read_hmac.set_key(session_key);
  write_hmac.set_key(session_key);
}

void
session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
{
  keypair our_kp;
  load_key_pair(app, app.signing_key, our_kp);
  string hmac_key;
  decrypt_rsa(app.lua, app.signing_key, our_kp.priv,
              hmac_key_encrypted, hmac_key);
  set_session_key(hmac_key);
}

void
session::setup_client_tickers()
{
  // xgettext: please use short message and try to avoid multibytes chars
  byte_in_ticker.reset(new ticker(_("bytes in"), ">", 1024, true));
  // xgettext: please use short message and try to avoid multibytes chars
  byte_out_ticker.reset(new ticker(_("bytes out"), "<", 1024, true));
  if (role == sink_role)
    {
      // xgettext: please use short message and try to avoid multibytes chars
      cert_in_ticker.reset(new ticker(_("certs in"), "c", 3));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_in_ticker.reset(new ticker(_("revs in"), "r", 1));
    }
  else if (role == source_role)
    {
      // xgettext: please use short message and try to avoid multibytes chars
      cert_out_ticker.reset(new ticker(_("certs out"), "C", 3));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_out_ticker.reset(new ticker(_("revs out"), "R", 1));
    }
  else
    {
      I(role == source_and_sink_role);
      // xgettext: please use short message and try to avoid multibytes chars
      revision_in_ticker.reset(new ticker(_("revs in"), "r", 1));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_out_ticker.reset(new ticker(_("revs out"), "R", 1));
    }
}

bool
session::done_all_refinements()
{
  bool all = rev_refiner.done
    && cert_refiner.done
    && key_refiner.done
    && epoch_refiner.done;

  if (all && !set_totals)
    {
      if (cert_out_ticker.get())
        cert_out_ticker->set_total(cert_refiner.items_to_send.size());

      if (revision_out_ticker.get())
        revision_out_ticker->set_total(rev_refiner.items_to_send.size());

      if (cert_in_ticker.get())
        cert_in_ticker->set_total(cert_refiner.items_to_receive);

      if (revision_in_ticker.get())
        revision_in_ticker->set_total(rev_refiner.items_to_receive);

      set_totals = true;
    }
  return all;
}



bool
session::received_all_items()
{
  if (role == source_role)
    return true;
  bool all = rev_refiner.items_to_receive == 0
    && cert_refiner.items_to_receive == 0
    && key_refiner.items_to_receive == 0
    && epoch_refiner.items_to_receive == 0;
  return all;
}

bool
session::finished_working()
{
  bool all = done_all_refinements()
    && received_all_items()
    && queued_all_items()
    && rev_enumerator.done();
  return all;
}

bool
session::queued_all_items()
{
  if (role == sink_role)
    return true;
  bool all = rev_refiner.items_to_send.empty()
    && cert_refiner.items_to_send.empty()
    && key_refiner.items_to_send.empty()
    && epoch_refiner.items_to_send.empty();
  return all;
}


void
session::maybe_note_epochs_finished()
{
  // Maybe there are outstanding epoch requests.
  if (!epoch_refiner.items_to_receive == 0)
    return;

  // And maybe we haven't even finished the refinement.
  if (!epoch_refiner.done)
    return;

  // If we ran into an error -- say a mismatched epoch -- don't do any
  // further refinements.
  if (encountered_error)
    return;

  // But otherwise, we're ready to go. Start the next
  // set of refinements.
  key_refiner.begin_refinement();
  cert_refiner.begin_refinement();
  rev_refiner.begin_refinement();
}

static void
decrement_if_nonzero(netcmd_item_type ty,
                     size_t & n)
{
  if (n == 0)
    {
      string typestr;
      netcmd_item_type_to_string(ty, typestr);
      E(false, F("underflow on count of %s items to receive") % typestr);
    }
  --n;
}

void
session::note_item_arrived(netcmd_item_type ty, id const & ident)
{
  switch (ty)
    {
    case cert_item:
      decrement_if_nonzero(ty, cert_refiner.items_to_receive);
      if (cert_in_ticker.get() != NULL)
        ++(*cert_in_ticker);
      break;
    case revision_item:
      decrement_if_nonzero(ty, rev_refiner.items_to_receive);
      if (revision_in_ticker.get() != NULL)
        ++(*revision_in_ticker);
      break;
    case key_item:
      decrement_if_nonzero(ty, key_refiner.items_to_receive);
      break;
    case epoch_item:
      decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
      break;
    default:
      // No ticker for other things.
      break;
    }
}



void
session::note_item_sent(netcmd_item_type ty, id const & ident)
{
  switch (ty)
    {
    case cert_item:
      cert_refiner.items_to_send.erase(ident);
      if (cert_out_ticker.get() != NULL)
        ++(*cert_out_ticker);
      break;
    case revision_item:
      rev_refiner.items_to_send.erase(ident);
      if (revision_out_ticker.get() != NULL)
        ++(*revision_out_ticker);
      break;
    case key_item:
      key_refiner.items_to_send.erase(ident);
      break;
    case epoch_item:
      epoch_refiner.items_to_send.erase(ident);
      break;
    default:
      // No ticker for other things.
      break;
    }
}

void
session::write_netcmd_and_try_flush(netcmd const & cmd)
{
  if (!encountered_error)
  {
    string buf;
    cmd.write(buf, write_hmac);
    outbuf.push_back(make_pair(buf, 0));
    outbuf_size += buf.size();
  }
  else
    L(FL("dropping outgoing netcmd (because we're in error unwind mode)\n"));
  // FIXME: this helps keep the protocol pipeline full but it seems to
  // interfere with initial and final sequences. careful with it.
  // write_some();
  // read_some();
}

// This method triggers a special "error unwind" mode to netsync.  In this
// mode, all received data is ignored, and no new data is queued.  We simply
// stay connected long enough for the current write buffer to be flushed, to
// ensure that our peer receives the error message.
// Affects read_some, write_some, and process .
void
session::error(std::string const & errmsg)
{
  throw netsync_error(errmsg);
}


Netxx::Probe::ready_type
session::which_events() const
{
  // Only ask to read if we're not armed.
  if (outbuf.empty())
    {
      if (inbuf.size() < constants::netcmd_maxsz && !armed)
        return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
      else
        return Netxx::Probe::ready_oobd;
    }
  else
    {
      if (inbuf.size() < constants::netcmd_maxsz && !armed)
        return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
      else
        return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
    }
}

bool
session::read_some()
{
  I(inbuf.size() < constants::netcmd_maxsz);
  char tmp[constants::bufsz];
  Netxx::signed_size_type count = str.read(tmp, sizeof(tmp));
  if (count > 0)
    {
      L(FL("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id);
      if (encountered_error)
        {
          L(FL("in error unwind mode, so throwing them into the bit bucket\n"));
          return true;
        }
      inbuf.append(tmp,count);
      mark_recent_io();
      if (byte_in_ticker.get() != NULL)
        (*byte_in_ticker) += count;
      return true;
    }
  else
    return false;
}

bool
session::write_some()
{
  I(!outbuf.empty());
  size_t writelen = outbuf.front().first.size() - outbuf.front().second;
  Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second,
                                            std::min(writelen,
                                            constants::bufsz));
  if (count > 0)
    {
      if ((size_t)count == writelen)
        {
          outbuf_size -= outbuf.front().first.size();
          outbuf.pop_front();
        }
      else
        {
          outbuf.front().second += count;
        }
      L(FL("wrote %d bytes to fd %d (peer %s)\n")
        % count % fd % peer_id);
      mark_recent_io();
      if (byte_out_ticker.get() != NULL)
        (*byte_out_ticker) += count;
      if (encountered_error && outbuf.empty())
        {
          // we've flushed our error message, so it's time to get out.
          L(FL("finished flushing output queue in error unwind mode, disconnecting\n"));
          return false;
        }
      return true;
    }
  else
    return false;
}

// senders

void
session::queue_error_cmd(string const & errmsg)
{
  L(FL("queueing 'error' command\n"));
  netcmd cmd;
  cmd.write_error_cmd(errmsg);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_bye_cmd(u8 phase)
{
  L(FL("queueing 'bye' command, phase %d\n")
    % static_cast<size_t>(phase));
  netcmd cmd;
  cmd.write_bye_cmd(phase);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_done_cmd(netcmd_item_type type,
                        size_t n_items)
{
  string typestr;
  netcmd_item_type_to_string(type, typestr);
  L(FL("queueing 'done' command for %s (%d items)\n")
    % typestr % n_items);
  netcmd cmd;
  cmd.write_done_cmd(type, n_items);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_hello_cmd(rsa_keypair_id const & key_name,
                         base64<rsa_pub_key> const & pub_encoded,
                         id const & nonce)
{
  rsa_pub_key pub;
  decode_base64(pub_encoded, pub);
  cmd.write_hello_cmd(key_name, pub, nonce);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_anonymous_cmd(protocol_role role,
                             utf8 const & include_pattern,
                             utf8 const & exclude_pattern,
                             id const & nonce2,
                             base64<rsa_pub_key> server_key_encoded)
{
  netcmd cmd;
  rsa_oaep_sha_data hmac_key_encrypted;
  encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
              nonce2(), hmac_key_encrypted);
  cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
                          hmac_key_encrypted);
  write_netcmd_and_try_flush(cmd);
  set_session_key(nonce2());
}

void
session::queue_auth_cmd(protocol_role role,
                        utf8 const & include_pattern,
                        utf8 const & exclude_pattern,
                        id const & client,
                        id const & nonce1,
                        id const & nonce2,
                        string const & signature,
                        base64<rsa_pub_key> server_key_encoded)
{
  netcmd cmd;
  rsa_oaep_sha_data hmac_key_encrypted;
  encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
              nonce2(), hmac_key_encrypted);
  cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
                     nonce1, hmac_key_encrypted, signature);
  write_netcmd_and_try_flush(cmd);
  set_session_key(nonce2());
}

void
session::queue_confirm_cmd()
{
  netcmd cmd;
  cmd.write_confirm_cmd();
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
{
  string typestr;
  hexenc<prefix> hpref;
  node.get_hex_prefix(hpref);
  netcmd_item_type_to_string(node.type, typestr);
  L(FL("queueing refinement %s of %s node '%s', level %d\n")
    % (ty == refinement_query ? "query" : "response")
    % typestr % hpref % static_cast<int>(node.level));
  netcmd cmd;
  cmd.write_refine_cmd(ty, node);
  write_netcmd_and_try_flush(cmd);
}

void
session::queue_data_cmd(netcmd_item_type type,
                        id const & item,
                        string const & dat)
{
  string typestr;
  netcmd_item_type_to_string(type, typestr);
  hexenc<id> hid;
  encode_hexenc(item, hid);

  if (role == sink_role)
    {
      L(FL("not queueing %s data for '%s' as we are in pure sink role\n")
        % typestr % hid);
      return;
    }

  L(FL("queueing %d bytes of data for %s item '%s'\n")
    % dat.size() % typestr % hid);

  netcmd cmd;
  // TODO: This pair of functions will make two copies of a large
  // file, the first in cmd.write_data_cmd, and the second in
  // write_netcmd_and_try_flush when the data is copied from the
  // cmd.payload variable to the string buffer for output.  This
  // double copy should be collapsed out, it may be better to use
  // a string_queue for output as well as input, as that will reduce
  // the amount of mallocs that happen when the string queue is large
  // enough to just store the data.
  cmd.write_data_cmd(type, item, dat);
  write_netcmd_and_try_flush(cmd);
  note_item_sent(type, item);
}

void
session::queue_delta_cmd(netcmd_item_type type,
                         id const & base,
                         id const & ident,
                         delta const & del)
{
  I(type == file_item);
  I(! del().empty() || ident == base);
  string typestr;
  netcmd_item_type_to_string(type, typestr);
  hexenc<id> base_hid;
  encode_hexenc(base, base_hid);
  hexenc<id> ident_hid;
  encode_hexenc(ident, ident_hid);

  if (role == sink_role)
    {
      L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role\n")
        % typestr % base_hid % ident_hid);
      return;
    }

  L(FL("queueing %s delta '%s' -> '%s'\n")
    % typestr % base_hid % ident_hid);
  netcmd cmd;
  cmd.write_delta_cmd(type, base, ident, del);
  write_netcmd_and_try_flush(cmd);
  note_item_sent(type, ident);
}


// processors

bool
session::process_error_cmd(string const & errmsg)
{
  throw bad_decode(F("received network error: %s") % errmsg);
}

void
get_branches(app_state & app, vector<string> & names)
{
  app.db.get_branches(names);
  sort(names.begin(), names.end());
}

static const var_domain known_servers_domain = var_domain("known-servers");

bool
session::process_hello_cmd(rsa_keypair_id const & their_keyname,
                           rsa_pub_key const & their_key,
                           id const & nonce)
{
  I(this->remote_peer_key_hash().size() == 0);
  I(this->saved_nonce().size() == 0);

  hexenc<id> their_key_hash;
  base64<rsa_pub_key> their_key_encoded;
  encode_base64(their_key, their_key_encoded);
  key_hash_code(their_keyname, their_key_encoded, their_key_hash);
  L(FL("server key has name %s, hash %s\n") % their_keyname % their_key_hash);
  var_key their_key_key(known_servers_domain, var_name(peer_id));
  if (app.db.var_exists(their_key_key))
    {
      var_value expected_key_hash;
      app.db.get_var(their_key_key, expected_key_hash);
      if (expected_key_hash() != their_key_hash())
        {
          P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
              "@ WARNING: SERVER IDENTIFICATION HAS CHANGED              @\n"
              "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
              "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
              "it is also possible that the server key has just been changed\n"
              "remote host sent key %s\n"
              "I expected %s\n"
              "'monotone unset %s %s' overrides this check\n")
            % their_key_hash % expected_key_hash
            % their_key_key.first % their_key_key.second);
          E(false, F("server key changed"));
        }
    }
  else
    {
      P(F("first time connecting to server %s\n"
          "I'll assume it's really them, but you might want to double-check\n"
          "their key's fingerprint: %s\n") % peer_id % their_key_hash);
      app.db.set_var(their_key_key, var_value(their_key_hash()));
    }
  if (!app.db.public_key_exists(their_key_hash))
    {
      W(F("saving public key for %s to database\n") % their_keyname);
      app.db.put_key(their_keyname, their_key_encoded);
    }

  {
    hexenc<id> hnonce;
    encode_hexenc(nonce, hnonce);
    L(FL("received 'hello' netcmd from server '%s' with nonce '%s'\n")
      % their_key_hash % hnonce);
  }

  I(app.db.public_key_exists(their_key_hash));

  // save their identity
  {
    id their_key_hash_decoded;
    decode_hexenc(their_key_hash, their_key_hash_decoded);
    this->remote_peer_key_hash = their_key_hash_decoded;
  }

  // clients always include in the synchronization set, every branch that the
  // user requested
  vector<string> branchnames;
  set<utf8> ok_branches;
  get_branches(app, branchnames);
  for (vector<string>::const_iterator i = branchnames.begin();
      i != branchnames.end(); i++)
    {
      if (our_matcher(*i))
        ok_branches.insert(utf8(*i));
    }
  rebuild_merkle_trees(app, ok_branches);

  setup_client_tickers();

  if (app.signing_key() != "")
    {
      // get our key pair
      keypair our_kp;
      load_key_pair(app, app.signing_key, our_kp);

      // get the hash identifier for our pubkey
      hexenc<id> our_key_hash;
      id our_key_hash_raw;
      key_hash_code(app.signing_key, our_kp.pub, our_key_hash);
      decode_hexenc(our_key_hash, our_key_hash_raw);

      // make a signature
      base64<rsa_sha1_signature> sig;
      rsa_sha1_signature sig_raw;
      make_signature(app, app.signing_key, our_kp.priv, nonce(), sig);
      decode_base64(sig, sig_raw);

      // make a new nonce of our own and send off the 'auth'
      queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
                     our_key_hash_raw, nonce, mk_nonce(), sig_raw(),
                     their_key_encoded);
    }
  else
    {
      queue_anonymous_cmd(this->role, our_include_pattern,
                          our_exclude_pattern, mk_nonce(), their_key_encoded);
    }
  return true;
}

bool
session::process_anonymous_cmd(protocol_role role,
                               utf8 const & their_include_pattern,
                               utf8 const & their_exclude_pattern)
{
  // Internally netsync thinks in terms of sources and sinks. Users like
  // thinking of repositories as "readonly", "readwrite", or "writeonly".
  //
  // We therefore use the read/write terminology when dealing with the UI:
  // if the user asks to run a "read only" service, this means they are
  // willing to be a source but not a sink.
  //
  // nb: The "role" here is the role the *client* wants to play
  //     so we need to check that the opposite role is allowed for us,
  //     in our this->role field.
  //

  // Client must be a sink and server must be a source (anonymous
  // read-only).

  if (role != sink_role)
    {
      W(F("rejected attempt at anonymous connection for write\n"));
      this->saved_nonce = id("");
      return false;
    }

  if (this->role != source_role && this->role != source_and_sink_role