Below is the file 'netsync.cc' from this revision. You can also download the file.

// copyright (C) 2004 graydon hoare <graydon@pobox.com>
// all rights reserved.
// licensed to the public under the terms of the GNU GPL (>= 2)
// see the file COPYING for details

#include <map>
#include <string>
#include <memory>
#include <list>
#include <deque>
#include <stack>

#include <time.h>

#include <boost/lexical_cast.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/regex.hpp>

#include "app_state.hh"
#include "cert.hh"
#include "constants.hh"
#include "keys.hh"
#include "merkle_tree.hh"
#include "netcmd.hh"
#include "netio.hh"
#include "netsync.hh"
#include "numeric_vocab.hh"
#include "packet.hh"
#include "sanity.hh"
#include "transforms.hh"
#include "ui.hh"
#include "xdelta.hh"
#include "epoch.hh"
#include "platform.hh"
#include "hmac.hh"
#include "globish.hh"

#include "botan/botan.h"

#include "netxx/address.h"
#include "netxx/peer.h"
#include "netxx/probe.h"
#include "netxx/socket.h"
#include "netxx/stream.h"
#include "netxx/streamserver.h"
#include "netxx/timeout.h"

// TODO: things to do that will break protocol compatibility
//   -- need some way to upgrade anonymous to keyed pull, without user having
//      to explicitly specify which they want
//      just having a way to respond "access denied, try again" might work
//      but perhaps better to have the anonymous command include a note "I
//      _could_ use key <...> if you prefer", and if that would lead to more
//      access, could reply "I do prefer".  (Does this lead to too much
//      information exposure?  Allows anonymous people to probe what branches
//      a key has access to.)
//   -- "warning" packet type?
//   -- Richard Levitte wants, when you (e.g.) request '*' but don't access to
//      all of it, you just get the parts you have access to (maybe with
//      warnings about skipped branches).  to do this right, should have a way
//      for the server to send back to the client "right, you're not getting
//      the following branches: ...", so the client will not include them in
//      its merkle trie.
//   -- add some sort of vhost field to the client's first packet, saying who
//      they expect to talk to
//   -- connection teardown is flawed:
//      -- simple bug: often connections "fail" even though they succeeded.
//         should figure out why.  (Possibly one side doesn't wait for their
//         goodbye packet to drain before closing the socket?)
//      -- subtle misdesign: "goodbye" packets indicate completion of data
//         transfer.  they do not indicate that data has been written to
//         disk.  there should be some way to indicate that data has been
//         successfully written to disk.  See message (and thread)
//         <E0420553-34F3-45E8-9DA4-D8A5CB9B0600@hsdev.com> on
//         monotone-devel.
//   -- apparently we have a IANA approved port: 4691.  I guess we should
//      switch to using that.
//      (It's registered under the name "netsync".  "monotone" would probably
//      be better, but I don't know how possible it is to change this...)

//
// this is the "new" network synchronization (netsync) system in
// monotone. it is based on synchronizing a pair of merkle trees over an
// interactive connection.
//
// a netsync process between peers treats each peer as either a source, a
// sink, or both. when a peer is only a source, it will not write any new
// items to its database. when a peer is only a sink, it will not send any
// items from its database. when a peer is both a source and sink, it may
// send and write items freely.
//
// the post-state of a netsync is that each sink contains a superset of the
// items in its corresponding source; when peers are behaving as both
// source and sink, this means that the post-state of the sync is for the
// peers to have identical item sets.
//
// a peer can be a sink in at most one netsync process at a time; it can
// however be a source for multiple netsyncs simultaneously.
//
//
// data structure
// --------------
//
// each node in a merkle tree contains a fixed number of slots. this number
// is derived from a global parameter of the protocol -- the tree fanout --
// such that the number of slots is 2^fanout. for now we will assume that
// fanout is 4 thus there are 16 slots in a node, because this makes
// illustration easier. the other parameter of the protocol is the size of
// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
//
// each slot in a merkle tree node is in one of 4 states:
//
//   - empty
//   - live leaf
//   - dead leaf
//   - subtree
//
// in addition, each live or dead leaf contains a hash code which
// identifies an element of the set being synchronized. each subtree slot
// contains a hash code of the node immediately beneath it in the merkle
// tree. empty slots contain no hash codes.
//
// each node also summarizes, for sake of statistic-gathering, the number
// of set elements and total number of bytes in all of its subtrees, each
// stored as a size_t and sent as a uleb128.
//
// since empty slots have no hash code, they are represented implicitly by
// a bitmap at the head of each merkle tree node. as an additional
// integrity check, each merkle tree node contains a label indicating its
// prefix in the tree, and a hash of its own contents.
//
// in total, then, the byte-level representation of a <160,4> merkle tree
// node is as follows:
//
//      20 bytes       - hash of the remaining bytes in the node
//       1 byte        - type of this node (manifest, file, key, mcert, fcert)
//     1-N bytes       - level of this node in the tree (0 == "root", uleb128)
//    0-20 bytes       - the prefix of this node, 4 bits * level,
//                       rounded up to a byte
//     1-N bytes       - number of leaves under this node (uleb128)
//       4 bytes       - slot-state bitmap of the node
//   0-320 bytes       - between 0 and 16 live slots in the node
//
// so, in the worst case such a node is 367 bytes, with these parameters.
//
//
// protocol
// --------
//
// The protocol is a simple binary command-packet system over TCP;
// each packet consists of a single byte which identifies the protocol
// version, a byte which identifies the command name inside that
// version, a size_t sent as a uleb128 indicating the length of the
// packet, that many bytes of payload, and finally 20 bytes of SHA-1
// HMAC calculated over the payload.  The key for the SHA-1 HMAC is 20
// bytes of 0 during authentication, and a 20-byte random key chosen
// by the client after authentication (discussed below).
// decoding involves simply buffering until a sufficient number of bytes are
// received, then advancing the buffer pointer. any time an integrity check
// (the HMAC) fails, the protocol is assumed to have lost synchronization, and
// the connection is dropped. the parties are free to drop the tcp stream at
// any point, if too much data is received or too much idle time passes; no
// commitments or transactions are made.
//
// one special command, "bye", is used to shut down a connection
// gracefully.  once each side has received all the data they want, they
// can send a "bye" command to the other side. as soon as either side has
// both sent and received a "bye" command, they drop the connection. if
// either side sees an i/o failure (dropped connection) after they have
// sent a "bye" command, they consider the shutdown successful.
//
// the exchange begins in a non-authenticated state. the server sends a
// "hello <id> <nonce>" command, which identifies the server's RSA key and
// issues a nonce which must be used for a subsequent authentication.
//
// The client then responds with either:
//
// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
// role it wishes to play in the synchronization, identifies the pattern it
// wishes to sync with, signs the previous nonce with its own key, and informs
// the server of the HMAC key it wishes to use for this session (encrypted
// with the server's public key); or
//
// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
// <hmac key>" command, which identifies the role it wishes to play in the
// synchronization, the pattern it ishes to sync with, and the HMAC key it
// wishes to use for this session (also encrypted with the server's public
// key).
//
// The server then replies with a "confirm" command, which contains no
// other data but will only have the correct HMAC integrity code if
// the server received and properly decrypted the HMAC key offered by
// the client.  This transitions the peers into an authenticated state
// and begins refinement.
//
// refinement begins with the client sending its root public key and
// manifest certificate merkle nodes to the server. the server then
// compares the root to each slot in *its* root node, and for each slot
// either sends refined subtrees to the client, or (if it detects a missing
// item in one pattern or the other) sends either "data" or "send_data"
// commands corresponding to the role of the missing item (source or
// sink). the client then receives each refined subtree and compares it
// with its own, performing similar description/request behavior depending
// on role, and the cycle continues.
//
// detecting the end of refinement is subtle: after sending the refinement
// of the root node, the server sends a "done 0" command (queued behind all
// the other refinement traffic). when either peer receives a "done N"
// command it immediately responds with a "done N+1" command. when two done
// commands for a given merkle tree arrive with no interveining refinements,
// the entire merkle tree is considered complete.
//
// any "send_data" command received prompts a "data" command in response,
// if the requested item exists. if an item does not exist, a "nonexistant"
// response command is sent.
//
// once a response is received for each requested key and revision cert
// (either data or nonexistant) the requesting party walks the graph of
// received revision certs and transmits send_data or send_delta commands
// for all the revisions mentionned in the certs which it does not already
// have in its database.
//
// for each revision it receives, the recipient requests all the file data or
// deltas described in that revision.
//
// once all requested files, manifests, revisions and certs are received (or
// noted as nonexistant), the recipient closes its connection.
//
// (aside: this protocol is raw binary because coding density is actually
// important here, and each packet consists of very information-dense
// material that you wouldn't have a hope of typing in manually anyways)
//

using namespace std;
using boost::shared_ptr;
using boost::lexical_cast;

static inline void
require(bool check, string const & context)
{
  if (!check)
    throw bad_decode(F("check of '%s' failed") % context);
}

struct
done_marker
{
  bool current_level_had_refinements;
  bool tree_is_done;
  done_marker() :
    current_level_had_refinements(false),
    tree_is_done(false)
  {}
};

struct
session
{
  protocol_role role;
  protocol_voice const voice;
  utf8 const & our_include_pattern;
  utf8 const & our_exclude_pattern;
  globish_matcher our_matcher;
  app_state & app;

  string peer_id;
  Netxx::socket_type fd;
  Netxx::Stream str;

  string_queue inbuf;
  // deque of pair<string data, size_t cur_pos>
  deque< pair<string,size_t> > outbuf;
  // the total data stored in outbuf - this is
  // used as a valve to stop too much data
  // backing up
  size_t outbuf_size;

  netcmd cmd;
  bool armed;
  bool arm();

  id remote_peer_key_hash;
  rsa_keypair_id remote_peer_key_name;
  netsync_session_key session_key;
  chained_hmac read_hmac;
  chained_hmac write_hmac;
  bool authenticated;

  time_t last_io_time;
  auto_ptr<ticker> byte_in_ticker;
  auto_ptr<ticker> byte_out_ticker;
  auto_ptr<ticker> cert_in_ticker;
  auto_ptr<ticker> cert_out_ticker;
  auto_ptr<ticker> revision_in_ticker;
  auto_ptr<ticker> revision_out_ticker;
  auto_ptr<ticker> revision_checked_ticker;

  vector<revision_id> written_revisions;
  vector<rsa_keypair_id> written_keys;
  vector<cert> written_certs;

  map<netcmd_item_type, boost::shared_ptr<merkle_table> > merkle_tables;

  map<netcmd_item_type, done_marker> done_refinements;
  map<netcmd_item_type, boost::shared_ptr< set<id> > > requested_items;
  map<netcmd_item_type, boost::shared_ptr< set<id> > > received_items;
  map<netcmd_item_type, boost::shared_ptr< set<id> > > full_delta_items;
  map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestry;
  map<revision_id, map<cert_name, vector<cert> > > received_certs;
  set< pair<id, id> > reverse_delta_requests;
  bool analyzed_ancestry;

  id saved_nonce;
  bool received_goodbye;
  bool sent_goodbye;

  packet_db_valve dbw;

  session(protocol_role role,
          protocol_voice voice,
          utf8 const & our_include_pattern,
          utf8 const & our_exclude_pattern,
          app_state & app,
          string const & peer,
          Netxx::socket_type sock,
          Netxx::Timeout const & to);

  virtual ~session();

  void rev_written_callback(revision_id rid);
  void key_written_callback(rsa_keypair_id kid);
  void cert_written_callback(cert const & c);

  id mk_nonce();
  void mark_recent_io();

  void set_session_key(string const & key);
  void set_session_key(rsa_oaep_sha_data const & key_encrypted);

  void setup_client_tickers();

  bool done_all_refinements();
  bool cert_refinement_done();
  bool all_requested_revisions_received();

  void note_item_requested(netcmd_item_type ty, id const & i);
  void note_item_full_delta(netcmd_item_type ty, id const & ident);
  bool item_already_requested(netcmd_item_type ty, id const & i);
  void note_item_arrived(netcmd_item_type ty, id const & i);

  void maybe_note_epochs_finished();

  void note_item_sent(netcmd_item_type ty, id const & i);

  bool got_all_data();
  void maybe_say_goodbye();

  void get_heads_and_consume_certs(set<revision_id> & heads);

  void analyze_ancestry_graph();
  void analyze_manifest(manifest_map const & man);

  Netxx::Probe::ready_type which_events() const;
  bool read_some();
  bool write_some();

  bool encountered_error;
  void error(string const & errmsg);

  void write_netcmd_and_try_flush(netcmd const & cmd);
  void queue_bye_cmd();
  void queue_error_cmd(string const & errmsg);
  void queue_done_cmd(size_t level, netcmd_item_type type);
  void queue_hello_cmd(id const & server,
                       id const & nonce);
  void queue_anonymous_cmd(protocol_role role,
                           utf8 const & include_pattern,
                           utf8 const & exclude_pattern,
                           id const & nonce2,
                           base64<rsa_pub_key> server_key_encoded);
  void queue_auth_cmd(protocol_role role,
                      utf8 const & include_pattern,
                      utf8 const & exclude_pattern,
                      id const & client,
                      id const & nonce1,
                      id const & nonce2,
                      string const & signature,
                      base64<rsa_pub_key> server_key_encoded);
  void queue_confirm_cmd();
  void queue_refine_cmd(merkle_node const & node);
  void queue_send_data_cmd(netcmd_item_type type,
                           id const & item);
  void queue_send_delta_cmd(netcmd_item_type type,
                            id const & base,
                            id const & ident);
  void queue_data_cmd(netcmd_item_type type,
                      id const & item,
                      string const & dat);
  void queue_delta_cmd(netcmd_item_type type,
                       id const & base,
                       id const & ident,
                       delta const & del);
  void queue_nonexistant_cmd(netcmd_item_type type,
                             id const & item);

  bool process_bye_cmd();
  bool process_error_cmd(string const & errmsg);
  bool process_done_cmd(size_t level, netcmd_item_type type);
  bool process_hello_cmd(rsa_keypair_id const & server_keyname,
                         rsa_pub_key const & server_key,
                         id const & nonce);
  bool process_anonymous_cmd(protocol_role role,
                             utf8 const & their_include_pattern,
                             utf8 const & their_exclude_pattern);
  bool process_auth_cmd(protocol_role role,
                        utf8 const & their_include_pattern,
                        utf8 const & their_exclude_pattern,
                        id const & client,
                        id const & nonce1,
                        string const & signature);
  bool process_confirm_cmd(string const & signature);
  void respond_to_confirm_cmd();
  bool process_refine_cmd(merkle_node const & node);
  bool process_send_data_cmd(netcmd_item_type type,
                             id const & item);
  bool process_send_delta_cmd(netcmd_item_type type,
                              id const & base,
                              id const & ident);
  bool process_data_cmd(netcmd_item_type type,
                        id const & item,
                        string const & dat);
  bool process_delta_cmd(netcmd_item_type type,
                         id const & base,
                         id const & ident,
                         delta const & del);
  bool process_nonexistant_cmd(netcmd_item_type type,
                               id const & item);

  bool merkle_node_exists(netcmd_item_type type,
                          size_t level,
                          prefix const & pref);

  void load_merkle_node(netcmd_item_type type,
                        size_t level,
                        prefix const & pref,
                        merkle_ptr & node);

  void rebuild_merkle_trees(app_state & app,
                            set<utf8> const & branches);

  bool dispatch_payload(netcmd const & cmd);
  void begin_service();
  bool process();
};

struct
ancestry_fetcher
{
  session & sess;

  // map children to parents
  multimap< file_id, file_id > rev_file_deltas;
  multimap< manifest_id, manifest_id > rev_manifest_deltas;
  // map an ancestor to a child
  multimap< file_id, file_id > fwd_file_deltas;
  multimap< manifest_id, manifest_id > fwd_manifest_deltas;

  set< file_id > seen_files;

  ancestry_fetcher(session & s);
  // analysing the ancestry graph
  void traverse_files(change_set const & cset);
  void traverse_manifest(manifest_id const & child_man,
                         manifest_id const & parent_man);
  void traverse_ancestry(set<revision_id> const & heads);

  // requesting the data
  void request_rev_file_deltas(file_id const & start,
                               set<file_id> & done_files);
  void request_files();
  void request_rev_manifest_deltas(manifest_id const & start,
                                   set<manifest_id> & done_manifests);
  void request_manifests();


};


struct
root_prefix
{
  prefix val;
  root_prefix() : val("")
  {}
};

static root_prefix const &
get_root_prefix()
{
  // this is not a static variable for a bizarre reason: mac OSX runs
  // static initializers in the "wrong" order (application before
  // libraries), so the initializer for a static string in cryptopp runs
  // after the initializer for a static variable outside a function
  // here. therefore encode_hexenc() fails in the static initializer here
  // and the program crashes. curious, eh?
  static root_prefix ROOT_PREFIX;
  return ROOT_PREFIX;
}

static file_id null_ident;

session::session(protocol_role role,
                 protocol_voice voice,
                 utf8 const & our_include_pattern,
                 utf8 const & our_exclude_pattern,
                 app_state & app,
                 string const & peer,
                 Netxx::socket_type sock,
                 Netxx::Timeout const & to) :
  role(role),
  voice(voice),
  our_include_pattern(our_include_pattern),
  our_exclude_pattern(our_exclude_pattern),
  our_matcher(our_include_pattern, our_exclude_pattern),
  app(app),
  peer_id(peer),
  fd(sock),
  str(sock, to),
  inbuf(),
  outbuf_size(0),
  armed(false),
  remote_peer_key_hash(""),
  remote_peer_key_name(""),
  session_key(constants::netsync_key_initializer),
  read_hmac(constants::netsync_key_initializer),
  write_hmac(constants::netsync_key_initializer),
  authenticated(false),
  last_io_time(::time(NULL)),
  byte_in_ticker(NULL),
  byte_out_ticker(NULL),
  cert_in_ticker(NULL),
  cert_out_ticker(NULL),
  revision_in_ticker(NULL),
  revision_out_ticker(NULL),
  revision_checked_ticker(NULL),
  analyzed_ancestry(false),
  saved_nonce(""),
  received_goodbye(false),
  sent_goodbye(false),
  dbw(app, true),
  encountered_error(false)
{
  dbw.set_on_revision_written(boost::bind(&session::rev_written_callback,
                                          this, _1));
  dbw.set_on_cert_written(boost::bind(&session::cert_written_callback,
                                          this, _1));
  dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback,
                                          this, _1));

  done_refinements.insert(make_pair(cert_item, done_marker()));
  done_refinements.insert(make_pair(key_item, done_marker()));
  done_refinements.insert(make_pair(epoch_item, done_marker()));

  requested_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));

  received_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>())));
  received_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>())));
  received_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>())));
  received_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
  received_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
  received_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));

  full_delta_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
  full_delta_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
}

session::~session()
{
  vector<cert> unattached_certs;
  map<revision_id, vector<cert> > revcerts;
  for (vector<revision_id>::iterator i = written_revisions.begin();
       i != written_revisions.end(); ++i)
    revcerts.insert(make_pair(*i, vector<cert>()));
  for (vector<cert>::iterator i = written_certs.begin();
       i != written_certs.end(); ++i)
    {
      map<revision_id, vector<cert> >::iterator j;
      j = revcerts.find(i->ident);
      if (j == revcerts.end())
        unattached_certs.push_back(*i);
      else
        j->second.push_back(*i);
    }

  //Keys
  for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
       i != written_keys.end(); ++i)
    {
      app.lua.hook_note_netsync_pubkey_received(*i);
    }
  //Revisions
  for (vector<revision_id>::iterator i = written_revisions.begin();
      i != written_revisions.end(); ++i)
    {
      vector<cert> & ctmp(revcerts[*i]);
      set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
      for (vector<cert>::const_iterator j = ctmp.begin();
           j != ctmp.end(); ++j)
        {
          cert_value vtmp;
          decode_base64(j->value, vtmp);
          certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
        }
      app.lua.hook_note_netsync_revision_received(*i, certs);
    }
  //Certs (not attached to a new revision)
  for (vector<cert>::iterator i = unattached_certs.begin();
      i != unattached_certs.end(); ++i)
    {
      cert_value tmp;
      decode_base64(i->value, tmp);
      app.lua.hook_note_netsync_cert_received(i->ident, i->key,
                                              i->name, tmp);

    }
}

void session::rev_written_callback(revision_id rid)
{
  if (revision_checked_ticker.get())
    ++(*revision_checked_ticker);
  written_revisions.push_back(rid);
}

void session::key_written_callback(rsa_keypair_id kid)
{
  written_keys.push_back(kid);
}

void session::cert_written_callback(cert const & c)
{
  written_certs.push_back(c);
}

id
session::mk_nonce()
{
  I(this->saved_nonce().size() == 0);
  char buf[constants::merkle_hash_length_in_bytes];
  Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
          constants::merkle_hash_length_in_bytes);
  this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes);
  I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
  return this->saved_nonce;
}

void
session::mark_recent_io()
{
  last_io_time = ::time(NULL);
}

void
session::set_session_key(string const & key)
{
  session_key = netsync_session_key(key);
  read_hmac.set_key(session_key);
  write_hmac.set_key(session_key);
}

void
session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
{
  base64< arc4<rsa_priv_key> > our_priv;
  load_priv_key(app, app.signing_key, our_priv);
  string hmac_key;
  decrypt_rsa(app.lua, app.signing_key, our_priv, hmac_key_encrypted, hmac_key);
  set_session_key(hmac_key);
}

void
session::setup_client_tickers()
{
  // xgettext: please use short message and try to avoid multibytes chars
  byte_in_ticker.reset(new ticker(_("bytes in"), ">", 1024, true));
  // xgettext: please use short message and try to avoid multibytes chars
  byte_out_ticker.reset(new ticker(_("bytes out"), "<", 1024, true));
  if (role == sink_role)
    {
      // xgettext: please use short message and try to avoid multibytes chars
      revision_checked_ticker.reset(new ticker(_("revs written"), "w", 1));
      // xgettext: please use short message and try to avoid multibytes chars
      cert_in_ticker.reset(new ticker(_("certs in"), "c", 3));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_in_ticker.reset(new ticker(_("revs in"), "r", 1));
    }
  else if (role == source_role)
    {
      // xgettext: please use short message and try to avoid multibytes chars
      cert_out_ticker.reset(new ticker(_("certs out"), "C", 3));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_out_ticker.reset(new ticker(_("revs out"), "R", 1));
    }
  else
    {
      I(role == source_and_sink_role);
      // xgettext: please use short message and try to avoid multibytes chars
      revision_checked_ticker.reset(new ticker(_("revs written"), "w", 1));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_in_ticker.reset(new ticker(_("revs in"), "r", 1));
      // xgettext: please use short message and try to avoid multibytes chars
      revision_out_ticker.reset(new ticker(_("revs out"), "R", 1));
    }
}

bool
session::done_all_refinements()
{
  bool all = true;
  for (map<netcmd_item_type, done_marker>::const_iterator j =
         done_refinements.begin(); j != done_refinements.end(); ++j)
    {
      if (j->second.tree_is_done == false)
        all = false;
    }
  return all;
}


bool
session::cert_refinement_done()
{
  return done_refinements[cert_item].tree_is_done;
}

bool
session::got_all_data()
{
  for (map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i =
         requested_items.begin(); i != requested_items.end(); ++i)
    {
      if (! i->second->empty())
        return false;
    }
  return true;
}

bool
session::all_requested_revisions_received()
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(revision_item);
  I(i != requested_items.end());
  return i->second->empty();
}

void
session::maybe_note_epochs_finished()
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(epoch_item);
  I(i != requested_items.end());
  // Maybe there are outstanding epoch requests.
  if (!i->second->empty())
    return;
  // And maybe we haven't even finished the refinement.
  if (!done_refinements[epoch_item].tree_is_done)
    return;
  // But otherwise, we're ready to go!
  L(F("all epochs processed, opening database valve\n"));
  this->dbw.open_valve();
}

void
session::note_item_requested(netcmd_item_type ty, id const & ident)
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(ty);
  I(i != requested_items.end());
  i->second->insert(ident);
}

void
session::note_item_full_delta(netcmd_item_type ty, id const & ident)
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = full_delta_items.find(ty);
  I(i != full_delta_items.end());
  i->second->insert(ident);
}

void
session::note_item_arrived(netcmd_item_type ty, id const & ident)
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(ty);
  I(i != requested_items.end());
  i->second->erase(ident);
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    j = received_items.find(ty);
  I(j != received_items.end());
  j->second->insert(ident);


  switch (ty)
    {
    case cert_item:
      if (cert_in_ticker.get() != NULL)
        ++(*cert_in_ticker);
      break;
    case revision_item:
      if (revision_in_ticker.get() != NULL)
        ++(*revision_in_ticker);
      break;
    default:
      // No ticker for other things.
      break;
    }
}

bool
session::item_already_requested(netcmd_item_type ty, id const & ident)
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i;
  i = requested_items.find(ty);
  I(i != requested_items.end());
  if (i->second->find(ident) != i->second->end())
    return true;
  i = received_items.find(ty);
  I(i != received_items.end());
  if (i->second->find(ident) != i->second->end())
    return true;
  return false;
}


void
session::note_item_sent(netcmd_item_type ty, id const & ident)
{
  switch (ty)
    {
    case cert_item:
      if (cert_out_ticker.get() != NULL)
        ++(*cert_out_ticker);
      break;
    case revision_item:
      if (revision_out_ticker.get() != NULL)
        ++(*revision_out_ticker);
      break;
    default:
      // No ticker for other things.
      break;
    }
}

void
session::write_netcmd_and_try_flush(netcmd const & cmd)
{
  if (!encountered_error)
  {
    string buf;
    cmd.write(buf, write_hmac);
    outbuf.push_back(make_pair(buf, 0));
    outbuf_size += buf.size();
  }
  else
    L(F("dropping outgoing netcmd (because we're in error unwind mode)\n"));
  // FIXME: this helps keep the protocol pipeline full but it seems to
  // interfere with initial and final sequences. careful with it.
  // write_some();
  // read_some();
}

// This method triggers a special "error unwind" mode to netsync.  In this
// mode, all received data is ignored, and no new data is queued.  We simply
// stay connected long enough for the current write buffer to be flushed, to
// ensure that our peer receives the error message.
// WARNING WARNING WARNING (FIXME): this does _not_ throw an exception.  if
// while processing any given netcmd packet you encounter an error, you can
// _only_ call this method if you have not touched the database, because if
// you have touched the database then you need to throw an exception to
// trigger a rollback.
// you could, of course, call this method and then throw an exception, but
// there is no point in doing that, because throwing the exception will cause
// the connection to be immediately terminated, so your call to error() will
// actually have no effect (except to cause your error message to be printed
// twice).
void
session::error(std::string const & errmsg)
{
  W(F("error: %s\n") % errmsg);
  queue_error_cmd(errmsg);
  encountered_error = true;
}

void
session::analyze_manifest(manifest_map const & man)
{
  L(F("analyzing %d entries in manifest\n") % man.size());
  for (manifest_map::const_iterator i = man.begin();
       i != man.end(); ++i)
    {
      if (! this->app.db.file_version_exists(manifest_entry_id(i)))
        {
          id tmp;
          decode_hexenc(manifest_entry_id(i).inner(), tmp);
          queue_send_data_cmd(file_item, tmp);
        }
    }
}

inline static id
plain_id(manifest_id const & i)
{
  id tmp;
  hexenc<id> htmp(i.inner());
  decode_hexenc(htmp, tmp);
  return tmp;
}

inline static id
plain_id(file_id const & i)
{
  id tmp;
  hexenc<id> htmp(i.inner());
  decode_hexenc(htmp, tmp);
  return tmp;
}

void
session::get_heads_and_consume_certs( set<revision_id> & heads )
{
  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
  typedef map<cert_name, vector<cert> > cert_map;

  set<revision_id> nodes, parents;
  map<revision_id, int> chld_num;
  L(F("analyzing %d ancestry edges\n") % ancestry.size());

  for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i)
    {
      nodes.insert(i->first);
      for (edge_map::const_iterator j = i->second->second.edges.begin();
           j != i->second->second.edges.end(); ++j)
        {
          parents.insert(edge_old_revision(j));
          map<revision_id, int>::iterator n;
          n = chld_num.find(edge_old_revision(j));
          if (n == chld_num.end())
            chld_num.insert(make_pair(edge_old_revision(j), 1));
          else
            ++(n->second);
        }
    }

  set_difference(nodes.begin(), nodes.end(),
                 parents.begin(), parents.end(),
                 inserter(heads, heads.begin()));

  L(F("intermediate set_difference heads size %d") % heads.size());

  // Write permissions checking:
  // remove heads w/o proper certs, add their children to heads
  // 1) remove unwanted branch certs from consideration
  // 2) remove heads w/o a branch tag, process new exposed heads
  // 3) repeat 2 until no change

  //1
  set<string> ok_branches, bad_branches;
  cert_name bcert_name(branch_cert_name);
  cert_name tcert_name(tag_cert_name);
  for (map<revision_id, cert_map>::iterator i = received_certs.begin();
      i != received_certs.end(); ++i)
    {
      //branches
      vector<cert> & bcerts(i->second[bcert_name]);
      vector<cert> keeping;
      for (vector<cert>::iterator j = bcerts.begin(); j != bcerts.end(); ++j)
        {
          cert_value name;
          decode_base64(j->value, name);
          if (ok_branches.find(name()) != ok_branches.end())
            keeping.push_back(*j);
          else if (bad_branches.find(name()) != bad_branches.end())
            ;
          else
            {
              if (our_matcher(name()))
                {
                  ok_branches.insert(name());
                  keeping.push_back(*j);
                }
              else
                {
                  bad_branches.insert(name());
                  W(F("Dropping branch certs for unwanted branch %s")
                    % name);
                }
            }
        }
      bcerts = keeping;
    }
  //2
  list<revision_id> tmp;
  for (set<revision_id>::iterator i = heads.begin(); i != heads.end(); ++i)
    {
      if (!received_certs[*i][bcert_name].size())
        tmp.push_back(*i);
    }
  for (list<revision_id>::iterator i = tmp.begin(); i != tmp.end(); ++i)
    heads.erase(*i);

  L(F("after step 2, heads size %d") % heads.size());
  //3
  while (tmp.size())
    {
      ancestryT::const_iterator i = ancestry.find(tmp.front());
      if (i != ancestry.end())
        {
          for (edge_map::const_iterator j = i->second->second.edges.begin();
               j != i->second->second.edges.end(); ++j)
            {
              if (!--chld_num[edge_old_revision(j)])
                {
                  if (received_certs[i->first][bcert_name].size())
                    heads.insert(i->first);
                  else
                    tmp.push_back(edge_old_revision(j));
                }
            }
          // since we don't want this rev, we don't want it's certs either
          received_certs[tmp.front()] = cert_map();
        }
        tmp.pop_front();
    }

  L(F("after step 3, heads size %d") % heads.size());
  // We've reduced the certs to those we want now, send them to dbw.
  for (map<revision_id, cert_map>::iterator i = received_certs.begin();
      i != received_certs.end(); ++i)
    {
      for (cert_map::iterator j = i->second.begin();
          j != i->second.end(); ++j)
        {
          for (vector<cert>::iterator k = j->second.begin();
              k != j->second.end(); ++k)
            {
              this->dbw.consume_revision_cert(revision<cert>(*k));
            }
        }
    }
}

void
session::analyze_ancestry_graph()
{
  L(F("analyze_ancestry_graph"));
  if (! (all_requested_revisions_received() && cert_refinement_done()))
    {
      L(F("not all done in analyze_ancestry_graph"));
      return;
    }

  if (analyzed_ancestry)
    {
      L(F("already analyzed_ancestry in analyze_ancestry_graph"));
      return;
    }

  L(F("analyze_ancestry_graph fetching"));

  ancestry_fetcher fetch(*this);

  analyzed_ancestry = true;
}

Netxx::Probe::ready_type
session::which_events() const
{
  if (outbuf.empty())
    {
      if (inbuf.size() < constants::netcmd_maxsz)
        return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
      else
        return Netxx::Probe::ready_oobd;
    }
  else
    {
      if (inbuf.size() < constants::netcmd_maxsz)
        return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
      else
        return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
    }
}

bool
session::read_some()
{
  I(inbuf.size() < constants::netcmd_maxsz);
  char tmp[constants::bufsz];
  Netxx::signed_size_type count = str.read(tmp, sizeof(tmp));
  if (count > 0)
    {
      L(F("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id);
      if (encountered_error)
        {
          L(F("in error unwind mode, so throwing them into the bit bucket\n"));
          return true;
        }
      inbuf.append(tmp,count);
      mark_recent_io();
      if (byte_in_ticker.get() != NULL)
        (*byte_in_ticker) += count;
      return true;
    }
  else
    return false;
}

bool
session::write_some()
{
  I(!outbuf.empty());
  size_t writelen = outbuf.front().first.size() - outbuf.front().second;
  Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second,
                                            std::min(writelen,
                                            constants::bufsz));
  if (count > 0)
    {
      if ((size_t)count == writelen)
        {
          outbuf_size -= outbuf.front().first.size();
          outbuf.pop_front();
        }
      else
        {
          outbuf.front().second += count;
        }
      L(F("wrote %d bytes to fd %d (peer %s)\n")
        % count % fd % peer_id);
      mark_recent_io();
      if (byte_out_ticker.get() != NULL)
        (*byte_out_ticker) += count;
      if (encountered_error && outbuf.empty())
        {
          // we've flushed our error message, so it's time to get out.
          L(F("finished flushing output queue in error unwind mode, disconnecting\n"));
          return false;
        }
      return true;
    }
  else
    return false;
}

// senders

void
session::queue_bye_cmd()
{
  L(F("queueing 'bye' command\n"));
  netcmd cmd;
  cmd.write_bye_cmd();
  write_netcmd_and_try_flush(cmd);
  this->sent_goodbye = true;
}

void
session::queue_error_cmd(string const & errmsg)
{
  L(F("queueing 'error' command\n"));
  netcmd cmd;
  cmd.write_error_cmd(errmsg);
  write_netcmd_and_try_flush(cmd);
  this->sent_goodbye = true;
}

void
session::queue_done_cmd(size_t level,
                        netcmd_item_type type)
{
  string typestr;
  netcmd_item_type_to_string(type, typestr);
  L(F("queueing 'done' command for %s level %s\n") % typestr % level);
  netcmd cmd;
  cmd