Below is the file 'netsync.cc' from this revision. You can also download the file.

// copyright (C) 2004 graydon hoare <graydon@pobox.com>
// all rights reserved.
// licensed to the public under the terms of the GNU GPL (>= 2)
// see the file COPYING for details

#include <map>
#include <string>
#include <memory>
#include <list>

#include <time.h>

#include <boost/lexical_cast.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/regex.hpp>

#include "app_state.hh"
#include "cert.hh"
#include "constants.hh"
#include "keys.hh"
#include "merkle_tree.hh"
#include "netcmd.hh"
#include "netio.hh"
#include "netsync.hh"
#include "numeric_vocab.hh"
#include "packet.hh"
#include "sanity.hh"
#include "transforms.hh"
#include "ui.hh"
#include "xdelta.hh"
#include "epoch.hh"
#include "platform.hh"

#include "cryptopp/osrng.h"

#include "netxx/address.h"
#include "netxx/peer.h"
#include "netxx/probe.h"
#include "netxx/socket.h"
#include "netxx/stream.h"
#include "netxx/streamserver.h"
#include "netxx/timeout.h"

//
// this is the "new" network synchronization (netsync) system in
// monotone. it is based on synchronizing a pair of merkle trees over an
// interactive connection.
//
// a netsync process between peers treats each peer as either a source, a
// sink, or both. when a peer is only a source, it will not write any new
// items to its database. when a peer is only a sink, it will not send any
// items from its database. when a peer is both a source and sink, it may
// send and write items freely.
//
// the post-state of a netsync is that each sink contains a superset of the
// items in its corresponding source; when peers are behaving as both
// source and sink, this means that the post-state of the sync is for the
// peers to have identical item sets.
//
// a peer can be a sink in at most one netsync process at a time; it can
// however be a source for multiple netsyncs simultaneously.
//
//
// data structure
// --------------
//
// each node in a merkle tree contains a fixed number of slots. this number
// is derived from a global parameter of the protocol -- the tree fanout --
// such that the number of slots is 2^fanout. for now we will assume that
// fanout is 4 thus there are 16 slots in a node, because this makes
// illustration easier. the other parameter of the protocol is the size of
// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
//
// each slot in a merkle tree node is in one of 4 states:
//
//   - empty
//   - live leaf
//   - dead leaf
//   - subtree
//
// in addition, each live or dead leaf contains a hash code which
// identifies an element of the set being synchronized. each subtree slot
// contains a hash code of the node immediately beneath it in the merkle
// tree. empty slots contain no hash codes.
//
// each node also summarizes, for sake of statistic-gathering, the number
// of set elements and total number of bytes in all of its subtrees, each
// stored as a size_t and sent as a uleb128.
//
// since empty slots have no hash code, they are represented implicitly by
// a bitmap at the head of each merkle tree node. as an additional
// integrity check, each merkle tree node contains a label indicating its
// prefix in the tree, and a hash of its own contents.
//
// in total, then, the byte-level representation of a <160,4> merkle tree
// node is as follows:
//
//      20 bytes       - hash of the remaining bytes in the node
//       1 byte        - type of this node (manifest, file, key, mcert, fcert)
//     1-N bytes       - level of this node in the tree (0 == "root", uleb128)
//    0-20 bytes       - the prefix of this node, 4 bits * level,
//                       rounded up to a byte
//     1-N bytes       - number of leaves under this node (uleb128)
//       4 bytes       - slot-state bitmap of the node
//   0-320 bytes       - between 0 and 16 live slots in the node
//
// so, in the worst case such a node is 367 bytes, with these parameters.
//
//
// protocol
// --------
//
// The protocol is a simple binary command-packet system over TCP;
// each packet consists of a single byte which identifies the protocol
// version, a byte which identifies the command name inside that
// version, a size_t sent as a uleb128 indicating the length of the
// packet, that many bytes of payload, and finally 20 bytes of SHA-1
// HMAC calculated over the payload.  The key for the SHA-1 HMAC is 20
// bytes of 0 during authentication, and a 20-byte random key chosen
// by the client after authentication (discussed below).
//
//---- Pre-v5 packet format ----
//
// the protocol is a simple binary command-packet system over tcp; each
// packet consists of a byte which identifies the protocol version, a byte
// which identifies the command name inside that version, a size_t sent as
// a uleb128 indicating the length of the packet, and then that many bytes
// of payload, and finally 4 bytes of adler32 checksum (in LSB order) over
// the payload.
//
// ---- end pre-v5 packet format ----
//
// decoding involves simply buffering until a sufficient number of
// bytes are received, then advancing the buffer pointer. any time an
// integrity check (adler32 for pre-v5, HMAC for post-v5) fails, the
// protocol is assumed to have lost synchronization, and the
// connection is dropped. the parties are free to drop the tcp stream
// at any point, if too much data is received or too much idle time
// passes; no commitments or transactions are made.
//
// one special command, "bye", is used to shut down a connection
// gracefully.  once each side has received all the data they want, they
// can send a "bye" command to the other side. as soon as either side has
// both sent and received a "bye" command, they drop the connection. if
// either side sees an i/o failure (dropped connection) after they have
// sent a "bye" command, they consider the shutdown successful.
//
// the exchange begins in a non-authenticated state. the server sends a
// "hello <id> <nonce>" command, which identifies the server's RSA key and
// issues a nonce which must be used for a subsequent authentication.
//
// The client then responds with either:
//
// An "auth (source|sink|both) <pattern> <id> <nonce1> <hmac key>
// <sig>" command, which identifies its RSA key, notes the role it
// wishes to play in the synchronization, identifies the pattern it
// wishes to sync with, signs the previous nonce with its own key, and
// informs the server of the HMAC key it wishes to use for this
// session (encrypted with the server's public key); or
//
// An "anonymous (source|sink|both) <pattern> <hmac key>" command,
// which identifies the role it wishes to play in the synchronization,
// the pattern it ishes to sync with, and the HMAC key it wishes to
// use for this session (also encrypted with the server's public key).
//
// The server then replies with a "confirm" command, which contains no
// other data but will only have the correct HMAC integrity code if
// the server received and properly decrypted the HMAC key offered by
// the client.  This transitions the peers into an authenticated state
// and begins refinement.
//
// ---- Pre-v5 authentication process notes ----
//
// the exchange begins in a non-authenticated state. the server sends a
// "hello <id> <nonce>" command, which identifies the server's RSA key and
// issues a nonce which must be used for a subsequent authentication.
// the client can then respond with an "auth (source|sink|both)
// <pattern> <id> <nonce1> <nonce2> <sig>" command which identifies its
// RSA key, notes the role it wishes to play in the synchronization,
// identifies the pattern it wishes to sync with, signs the previous
// nonce with its own key, and issues a nonce of its own for mutual
// authentication.
//
// the server can then respond with a "confirm <sig>" command, which is
// the signature of the second nonce sent by the client. this
// transitions the peers into an authenticated state and begins refinement.
//
// ---- End pre-v5 authentication process ----
//
// refinement begins with the client sending its root public key and
// manifest certificate merkle nodes to the server. the server then
// compares the root to each slot in *its* root node, and for each slot
// either sends refined subtrees to the client, or (if it detects a missing
// item in one pattern or the other) sends either "data" or "send_data"
// commands corresponding to the role of the missing item (source or
// sink). the client then receives each refined subtree and compares it
// with its own, performing similar description/request behavior depending
// on role, and the cycle continues.
//
// detecting the end of refinement is subtle: after sending the refinement
// of the root node, the server sends a "done 0" command (queued behind all
// the other refinement traffic). when either peer receives a "done N"
// command it immediately responds with a "done N+1" command. when two done
// commands for a given merkle tree arrive with no interveining refinements,
// the entire merkle tree is considered complete.
//
// any "send_data" command received prompts a "data" command in response,
// if the requested item exists. if an item does not exist, a "nonexistant"
// response command is sent.
//
// once a response is received for each requested key and revision cert
// (either data or nonexistant) the requesting party walks the graph of
// received revision certs and transmits send_data or send_delta commands
// for all the revisions mentionned in the certs which it does not already
// have in its database.
//
// for each revision it receives, the recipient requests all the file data or
// deltas described in that revision.
//
// once all requested files, manifests, revisions and certs are received (or
// noted as nonexistant), the recipient closes its connection.
//
// (aside: this protocol is raw binary because coding density is actually
// important here, and each packet consists of very information-dense
// material that you wouldn't have a hope of typing in manually anyways)
//

using namespace boost;
using namespace std;

static inline void
require(bool check, string const & context)
{
  if (!check)
    throw bad_decode(F("check of '%s' failed") % context);
}

struct
done_marker
{
  bool current_level_had_refinements;
  bool tree_is_done;
  done_marker() :
    current_level_had_refinements(false),
    tree_is_done(false)
  {}
};

struct
session
{
  protocol_role role;
  protocol_voice const voice;
  vector<utf8> patterns;
  app_state & app;

  string peer_id;
  Netxx::socket_type fd;
  Netxx::Stream str;

  string inbuf;
  string outbuf;

  netcmd cmd;
  bool armed;
  bool arm();

  utf8 pattern;
  boost::regex pattern_re;
  id remote_peer_key_hash;
  rsa_keypair_id remote_peer_key_name;
  netsync_session_key session_key;
  netsync_hmac_value read_hmac;
  netsync_hmac_value write_hmac;
  bool authenticated;

  time_t last_io_time;
  auto_ptr<ticker> byte_in_ticker;
  auto_ptr<ticker> byte_out_ticker;
  auto_ptr<ticker> cert_in_ticker;
  auto_ptr<ticker> cert_out_ticker;
  auto_ptr<ticker> revision_in_ticker;
  auto_ptr<ticker> revision_out_ticker;
  auto_ptr<ticker> revision_checked_ticker;

  vector<revision_id> written_revisions;
  vector<rsa_keypair_id> written_keys;
  vector<cert> written_certs;

  map<netcmd_item_type, boost::shared_ptr<merkle_table> > merkle_tables;

  map<netcmd_item_type, done_marker> done_refinements;
  map<netcmd_item_type, boost::shared_ptr< set<id> > > requested_items;
  map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestry;
  map<revision_id, map<cert_name, vector<cert> > > received_certs;
  set< pair<id, id> > reverse_delta_requests;
  bool analyzed_ancestry;

  id saved_nonce;
  bool received_goodbye;
  bool sent_goodbye;
  boost::scoped_ptr<CryptoPP::AutoSeededRandomPool> prng;

  packet_db_valve dbw;

  session(protocol_role role,
          protocol_voice voice,
          vector<utf8> const & patterns,
          app_state & app,
          string const & peer,
          Netxx::socket_type sock,
          Netxx::Timeout const & to);

  virtual ~session();

  void rev_written_callback(revision_id rid);
  void key_written_callback(rsa_keypair_id kid);
  void cert_written_callback(cert const & c);

  id mk_nonce();
  void mark_recent_io();

  void setup_client_tickers();

  bool done_all_refinements();
  bool cert_refinement_done();
  bool all_requested_revisions_received();

  void note_item_requested(netcmd_item_type ty, id const & i);
  bool item_request_outstanding(netcmd_item_type ty, id const & i);
  void note_item_arrived(netcmd_item_type ty, id const & i);

  void maybe_note_epochs_finished();

  void note_item_sent(netcmd_item_type ty, id const & i);

  bool got_all_data();
  void maybe_say_goodbye();

  void analyze_attachment(revision_id const & i,
                          set<revision_id> & visited,
                          map<revision_id, bool> & attached);
  void request_rev_revisions(revision_id const & init,
                             map<revision_id, bool> attached,
                             set<revision_id> visited);
  void request_fwd_revisions(revision_id const & i,
                             map<revision_id, bool> attached,
                             set<revision_id> & visited);
  void analyze_ancestry_graph();
  void analyze_manifest(manifest_map const & man);

  Netxx::Probe::ready_type which_events() const;
  bool read_some();
  bool write_some();

  bool encountered_error;
  void error(string const & errmsg);

  void write_netcmd_and_try_flush(netcmd const & cmd);
  void queue_bye_cmd();
  void queue_error_cmd(string const & errmsg);
  void queue_done_cmd(size_t level, netcmd_item_type type);
  void queue_hello_cmd(id const & server,
                       id const & nonce);
  void queue_anonymous_cmd(protocol_role role,
                           string const & pattern,
                           id const & nonce2,
                           base64<rsa_pub_key> server_key_encoded);
  void queue_auth_cmd(protocol_role role,
                      string const & pattern,
                      id const & client,
                      id const & nonce1,
                      id const & nonce2,
                      string const & signature,
                      base64<rsa_pub_key> server_key_encoded);
  void queue_confirm_cmd();
  void queue_refine_cmd(merkle_node const & node);
  void queue_send_data_cmd(netcmd_item_type type,
                           id const & item);
  void queue_send_delta_cmd(netcmd_item_type type,
                            id const & base,
                            id const & ident);
  void queue_data_cmd(netcmd_item_type type,
                      id const & item,
                      string const & dat);
  void queue_delta_cmd(netcmd_item_type type,
                       id const & base,
                       id const & ident,
                       delta const & del);
  void queue_nonexistant_cmd(netcmd_item_type type,
                             id const & item);

  bool process_bye_cmd();
  bool process_error_cmd(string const & errmsg);
  bool process_done_cmd(size_t level, netcmd_item_type type);
  bool process_hello_cmd(rsa_keypair_id const & server_keyname,
                         rsa_pub_key const & server_key,
                         id const & nonce);
  bool process_anonymous_cmd(protocol_role role,
                             string const & pattern);
  bool process_auth_cmd(protocol_role role,
                        string const & pattern,
                        id const & client,
                        id const & nonce1,
                        string const & signature);
  void respond_to_auth_cmd(rsa_oaep_sha_data hmac_key_encrypted);
  bool process_confirm_cmd(string const & signature);
  void respond_to_confirm_cmd();
  bool process_refine_cmd(merkle_node const & node);
  bool process_send_data_cmd(netcmd_item_type type,
                             id const & item);
  bool process_send_delta_cmd(netcmd_item_type type,
                              id const & base,
                              id const & ident);
  bool process_data_cmd(netcmd_item_type type,
                        id const & item,
                        string const & dat);
  bool process_delta_cmd(netcmd_item_type type,
                         id const & base,
                         id const & ident,
                         delta const & del);
  bool process_nonexistant_cmd(netcmd_item_type type,
                               id const & item);

  bool merkle_node_exists(netcmd_item_type type,
                          size_t level,
                          prefix const & pref);

  void load_merkle_node(netcmd_item_type type,
                        size_t level,
                        prefix const & pref,
                        merkle_ptr & node);

  void rebuild_merkle_trees(app_state & app,
                            set<utf8> const & branches);

  bool dispatch_payload(netcmd const & cmd);
  void begin_service();
  bool process();
};


struct
root_prefix
{
  prefix val;
  root_prefix() : val("")
  {}
};

static root_prefix const &
get_root_prefix()
{
  // this is not a static variable for a bizarre reason: mac OSX runs
  // static initializers in the "wrong" order (application before
  // libraries), so the initializer for a static string in cryptopp runs
  // after the initializer for a static variable outside a function
  // here. therefore encode_hexenc() fails in the static initializer here
  // and the program crashes. curious, eh?
  static root_prefix ROOT_PREFIX;
  return ROOT_PREFIX;
}


session::session(protocol_role role,
                 protocol_voice voice,
                 vector<utf8> const & patterns,
                 app_state & app,
                 string const & peer,
                 Netxx::socket_type sock,
                 Netxx::Timeout const & to) :
  role(role),
  voice(voice),
  patterns(patterns),
  app(app),
  peer_id(peer),
  fd(sock),
  str(sock, to),
  inbuf(""),
  outbuf(""),
  armed(false),
  pattern(""),
  pattern_re(".*"),
  remote_peer_key_hash(""),
  remote_peer_key_name(""),
  session_key(constants::netsync_key_initializer),
  read_hmac(constants::netsync_key_initializer),
  write_hmac(constants::netsync_key_initializer),
  authenticated(false),
  last_io_time(::time(NULL)),
  byte_in_ticker(NULL),
  byte_out_ticker(NULL),
  cert_in_ticker(NULL),
  cert_out_ticker(NULL),
  revision_in_ticker(NULL),
  revision_out_ticker(NULL),
  revision_checked_ticker(NULL),
  analyzed_ancestry(false),
  saved_nonce(""),
  received_goodbye(false),
  sent_goodbye(false),
  dbw(app, true),
  encountered_error(false)
{
  if (voice == client_voice)
    {
      N(patterns.size() == 1,
          F("client can only sync one pattern at a time"));
      this->pattern = idx(patterns, 0);
      this->pattern_re = boost::regex(this->pattern());
    }

  dbw.set_on_revision_written(boost::bind(&session::rev_written_callback,
                                          this, _1));
  dbw.set_on_cert_written(boost::bind(&session::cert_written_callback,
                                          this, _1));
  dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback,
                                          this, _1));

  // we will panic here if the user doesn't like urandom and we can't give
  // them a real entropy-driven random.
  bool request_blocking_rng = false;
  if (!app.lua.hook_non_blocking_rng_ok())
    {
#ifndef BLOCKING_RNG_AVAILABLE
      throw oops("no blocking RNG available and non-blocking RNG rejected");
#else
      request_blocking_rng = true;
#endif
    }
  prng.reset(new CryptoPP::AutoSeededRandomPool(request_blocking_rng));

  done_refinements.insert(make_pair(cert_item, done_marker()));
  done_refinements.insert(make_pair(key_item, done_marker()));
  done_refinements.insert(make_pair(epoch_item, done_marker()));

  requested_items.insert(make_pair(cert_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(key_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(revision_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
  requested_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));
}

session::~session()
{
  vector<cert> unattached_certs;
  map<revision_id, vector<cert> > revcerts;
  for (vector<revision_id>::iterator i = written_revisions.begin();
       i != written_revisions.end(); ++i)
    revcerts.insert(make_pair(*i, vector<cert>()));
  for (vector<cert>::iterator i = written_certs.begin();
       i != written_certs.end(); ++i)
    {
      map<revision_id, vector<cert> >::iterator j;
      j = revcerts.find(i->ident);
      if (j == revcerts.end())
        unattached_certs.push_back(*i);
      else
        j->second.push_back(*i);
    }

  //Keys
  for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
       i != written_keys.end(); ++i)
    {
      app.lua.hook_note_netsync_pubkey_received(*i);
    }
  //Revisions
  for (vector<revision_id>::iterator i = written_revisions.begin();
      i != written_revisions.end(); ++i)
    {
      vector<cert> & ctmp(revcerts[*i]);
      set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
      for (vector<cert>::const_iterator j = ctmp.begin();
           j != ctmp.end(); ++j)
        {
          cert_value vtmp;
          decode_base64(j->value, vtmp);
          certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
        }
      app.lua.hook_note_netsync_revision_received(*i, certs);
    }
  //Certs (not attached to a new revision)
  for (vector<cert>::iterator i = unattached_certs.begin();
      i != unattached_certs.end(); ++i)
    {
      cert_value tmp;
      decode_base64(i->value, tmp);
      app.lua.hook_note_netsync_cert_received(i->ident, i->key,
                                              i->name, tmp);

    }
}

void session::rev_written_callback(revision_id rid)
{
  if (revision_checked_ticker.get())
    ++(*revision_checked_ticker);
  written_revisions.push_back(rid);
}

void session::key_written_callback(rsa_keypair_id kid)
{
  written_keys.push_back(kid);
}

void session::cert_written_callback(cert const & c)
{
  written_certs.push_back(c);
}

id
session::mk_nonce()
{
  I(this->saved_nonce().size() == 0);
  char buf[constants::merkle_hash_length_in_bytes];
  prng->GenerateBlock(reinterpret_cast<byte *>(buf), constants::merkle_hash_length_in_bytes);
  this->saved_nonce = string(buf, buf + constants::merkle_hash_length_in_bytes);
  I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
  return this->saved_nonce;
}

void
session::mark_recent_io()
{
  last_io_time = ::time(NULL);
}

void
session::setup_client_tickers()
{
  byte_in_ticker.reset(new ticker("bytes in", ">", 1024, true));
  byte_out_ticker.reset(new ticker("bytes out", "<", 1024, true));
  if (role == sink_role)
    {
      revision_checked_ticker.reset(new ticker("revs written", "w", 1));
      cert_in_ticker.reset(new ticker("certs in", "c", 3));
      revision_in_ticker.reset(new ticker("revs in", "r", 1));
    }
  else if (role == source_role)
    {
      cert_out_ticker.reset(new ticker("certs out", "C", 3));
      revision_out_ticker.reset(new ticker("revs out", "R", 1));
    }
  else
    {
      I(role == source_and_sink_role);
      revision_checked_ticker.reset(new ticker("revs written", "w", 1));
      revision_in_ticker.reset(new ticker("revs in", "r", 1));
      revision_out_ticker.reset(new ticker("revs out", "R", 1));
    }
}

bool
session::done_all_refinements()
{
  bool all = true;
  for (map<netcmd_item_type, done_marker>::const_iterator j =
         done_refinements.begin(); j != done_refinements.end(); ++j)
    {
      if (j->second.tree_is_done == false)
        all = false;
    }
  return all;
}


bool
session::cert_refinement_done()
{
  return done_refinements[cert_item].tree_is_done;
}

bool
session::got_all_data()
{
  for (map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator i =
         requested_items.begin(); i != requested_items.end(); ++i)
    {
      if (! i->second->empty())
        return false;
    }
  return true;
}

bool
session::all_requested_revisions_received()
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(revision_item);
  I(i != requested_items.end());
  return i->second->empty();
}

void
session::maybe_note_epochs_finished()
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(epoch_item);
  I(i != requested_items.end());
  // Maybe there are outstanding epoch requests.
  if (!i->second->empty())
    return;
  // And maybe we haven't even finished the refinement.
  if (!done_refinements[epoch_item].tree_is_done)
    return;
  // But otherwise, we're ready to go!
  L(F("all epochs processed, opening database valve\n"));
  this->dbw.open_valve();
}

void
session::note_item_requested(netcmd_item_type ty, id const & ident)
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(ty);
  I(i != requested_items.end());
  i->second->insert(ident);
}

void
session::note_item_arrived(netcmd_item_type ty, id const & ident)
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(ty);
  I(i != requested_items.end());
  i->second->erase(ident);

  switch (ty)
    {
    case cert_item:
      if (cert_in_ticker.get() != NULL)
        ++(*cert_in_ticker);
      break;
    case revision_item:
      if (revision_in_ticker.get() != NULL)
        ++(*revision_in_ticker);
      break;
    default:
      // No ticker for other things.
      break;
    }
}

bool
session::item_request_outstanding(netcmd_item_type ty, id const & ident)
{
  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
    i = requested_items.find(ty);
  I(i != requested_items.end());
  return i->second->find(ident) != i->second->end();
}


void
session::note_item_sent(netcmd_item_type ty, id const & ident)
{
  switch (ty)
    {
    case cert_item:
      if (cert_out_ticker.get() != NULL)
        ++(*cert_out_ticker);
      break;
    case revision_item:
      if (revision_out_ticker.get() != NULL)
        ++(*revision_out_ticker);
      break;
    default:
      // No ticker for other things.
      break;
    }
}

void
session::write_netcmd_and_try_flush(netcmd const & cmd)
{
  if (!encountered_error)
    cmd.write(outbuf, session_key, write_hmac);
  else
    L(F("dropping outgoing netcmd (because we're in error unwind mode)\n"));
  // FIXME: this helps keep the protocol pipeline full but it seems to
  // interfere with initial and final sequences. careful with it.
  // write_some();
  // read_some();
}

// This method triggers a special "error unwind" mode to netsync.  In this
// mode, all received data is ignored, and no new data is queued.  We simply
// stay connected long enough for the current write buffer to be flushed, to
// ensure that our peer receives the error message.
// WARNING WARNING WARNING (FIXME): this does _not_ throw an exception.  if
// while processing any given netcmd packet you encounter an error, you can
// _only_ call this method if you have not touched the database, because if
// you have touched the database then you need to throw an exception to
// trigger a rollback.
// you could, of course, call this method and then throw an exception, but
// there is no point in doing that, because throwing the exception will cause
// the connection to be immediately terminated, so your call to error() will
// actually have no effect (except to cause your error message to be printed
// twice).
void
session::error(std::string const & errmsg)
{
  W(F("error: %s\n") % errmsg);
  queue_error_cmd(errmsg);
  encountered_error = true;
}

void
session::analyze_manifest(manifest_map const & man)
{
  L(F("analyzing %d entries in manifest\n") % man.size());
  for (manifest_map::const_iterator i = man.begin();
       i != man.end(); ++i)
    {
      if (! this->app.db.file_version_exists(manifest_entry_id(i)))
        {
          id tmp;
          decode_hexenc(manifest_entry_id(i).inner(), tmp);
          queue_send_data_cmd(file_item, tmp);
        }
    }
}

static bool
is_attached(revision_id const & i,
            map<revision_id, bool> const & attach_map)
{
  map<revision_id, bool>::const_iterator j = attach_map.find(i);
  I(j != attach_map.end());
  return j->second;
}

// this tells us whether a particular revision is "attached" -- meaning
// either our database contains the underlying manifest or else one of our
// parents (recursively, and only in the current ancestry graph we're
// requesting) is attached. if it's detached we will request it using a
// different (more efficient and less failure-prone) algorithm

void
session::analyze_attachment(revision_id const & i,
                            set<revision_id> & visited,
                            map<revision_id, bool> & attached)
{
  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;

  if (visited.find(i) != visited.end())
    return;

  visited.insert(i);

  bool curr_attached = false;

  if (app.db.revision_exists(i))
    {
      L(F("revision %s is attached via database\n") % i);
      curr_attached = true;
    }
  else
    {
      L(F("checking attachment of %s in ancestry\n") % i);
      ancestryT::const_iterator j = ancestry.find(i);
      if (j != ancestry.end())
        {
          for (edge_map::const_iterator k = j->second->second.edges.begin();
               k != j->second->second.edges.end(); ++k)
            {
              L(F("checking attachment of %s in parent %s\n") % i % edge_old_revision(k));
              analyze_attachment(edge_old_revision(k), visited, attached);
              if (is_attached(edge_old_revision(k), attached))
                {
                  L(F("revision %s is attached via parent %s\n") % i % edge_old_revision(k));
                  curr_attached = true;
                }
            }
        }
    }
  L(F("decided that revision %s %s attached\n") % i % (curr_attached ? "is" : "is not"));
  attached[i] = curr_attached;
}

inline static id
plain_id(manifest_id const & i)
{
  id tmp;
  hexenc<id> htmp(i.inner());
  decode_hexenc(htmp, tmp);
  return tmp;
}

inline static id
plain_id(file_id const & i)
{
  id tmp;
  hexenc<id> htmp(i.inner());
  decode_hexenc(htmp, tmp);
  return tmp;
}

void
session::request_rev_revisions(revision_id const & init,
                               map<revision_id, bool> attached,
                               set<revision_id> visited)
{
  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;

  set<manifest_id> seen_manifests;
  set<file_id> seen_files;

  set<revision_id> frontier;
  frontier.insert(init);
  while(!frontier.empty())
    {
      set<revision_id> next_frontier;
      for (set<revision_id>::const_iterator i = frontier.begin();
           i != frontier.end(); ++i)
        {
          if (is_attached(*i, attached))
            continue;

          if (visited.find(*i) != visited.end())
            continue;

          visited.insert(*i);

          ancestryT::const_iterator j = ancestry.find(*i);
          if (j != ancestry.end())
            {

              for (edge_map::const_iterator k = j->second->second.edges.begin();
                   k != j->second->second.edges.end(); ++k)
                {

                  next_frontier.insert(edge_old_revision(k));

                  // check out the manifest delta edge
                  manifest_id parent_manifest = edge_old_manifest(k);
                  manifest_id child_manifest = j->second->second.new_manifest;

                  // first, if we have a child we've never seen before we will need
                  // to request it in its entrety.
                  if (seen_manifests.find(child_manifest) == seen_manifests.end())
                    {
                      if (this->app.db.manifest_version_exists(child_manifest))
                        L(F("not requesting (in reverse) initial manifest %s as we already have it\n") % child_manifest);
                      else
                        {
                          L(F("requesting (in reverse) initial manifest data %s\n") % child_manifest);
                          queue_send_data_cmd(manifest_item, plain_id(child_manifest));
                        }
                      seen_manifests.insert(child_manifest);
                    }

                  // second, if the parent is nonempty, we want to ask for an edge to it
                  if (!parent_manifest.inner()().empty())
                    {
                      if (this->app.db.manifest_version_exists(parent_manifest))
                        L(F("not requesting (in reverse) manifest delta to %s as we already have it\n") % parent_manifest);
                      else
                        {
                          L(F("requesting (in reverse) manifest delta %s -> %s\n")
                            % child_manifest % parent_manifest);
                          reverse_delta_requests.insert(make_pair(plain_id(child_manifest),
                                                                  plain_id(parent_manifest)));
                          queue_send_delta_cmd(manifest_item,
                                               plain_id(child_manifest),
                                               plain_id(parent_manifest));
                        }
                      seen_manifests.insert(parent_manifest);
                    }



                  // check out each file delta edge
                  change_set const & cset = edge_changes(k);
                  for (change_set::delta_map::const_iterator d = cset.deltas.begin();
                       d != cset.deltas.end(); ++d)
                    {
                      file_id parent_file (delta_entry_src(d));
                      file_id child_file (delta_entry_dst(d));


                      // first, if we have a child we've never seen before we will need
                      // to request it in its entrety.
                      if (seen_files.find(child_file) == seen_files.end())
                        {
                          if (this->app.db.file_version_exists(child_file))
                            L(F("not requesting (in reverse) initial file %s as we already have it\n") % child_file);
                          else
                            {
                              L(F("requesting (in reverse) initial file data %s\n") % child_file);
                              queue_send_data_cmd(file_item, plain_id(child_file));
                            }
                          seen_files.insert(child_file);
                        }

                      // second, if the parent is nonempty, we want to ask for an edge to it
                      if (!parent_file.inner()().empty())
                        {
                          if (this->app.db.file_version_exists(parent_file))
                            L(F("not requesting (in reverse) file delta to %s as we already have it\n") % parent_file);
                          else
                            {
                              L(F("requesting (in reverse) file delta %s -> %s on %s\n")
                                % child_file % parent_file % delta_entry_path(d));
                              reverse_delta_requests.insert(make_pair(plain_id(child_file),
                                                                      plain_id(parent_file)));
                              queue_send_delta_cmd(file_item,
                                                   plain_id(child_file),
                                                   plain_id(parent_file));
                            }
                          seen_files.insert(parent_file);
                        }
                    }
                }

              // now actually consume the data packet, which will wait on the
              // arrival of its prerequisites in the packet_db_writer
              this->dbw.consume_revision_data(j->first, j->second->first);
            }
        }
      frontier = next_frontier;
    }
}

void
session::request_fwd_revisions(revision_id const & i,
                               map<revision_id, bool> attached,
                               set<revision_id> & visited)
{
  if (visited.find(i) != visited.end())
    return;

  visited.insert(i);

  L(F("visiting revision '%s' for forward deltas\n") % i);

  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;

  ancestryT::const_iterator j = ancestry.find(i);
  if (j != ancestry.end())
    {
      edge_map::const_iterator an_attached_edge = j->second->second.edges.end();

      // first make sure we've requested enough to get to here by
      // calling ourselves recursively. this is the forward path after all.

      for (edge_map::const_iterator k = j->second->second.edges.begin();
           k != j->second->second.edges.end(); ++k)
        {
          if (is_attached(edge_old_revision(k), attached))
            {
              request_fwd_revisions(edge_old_revision(k), attached, visited);
              an_attached_edge = k;
            }
        }

      I(an_attached_edge != j->second->second.edges.end());

      // check out the manifest delta edge
      manifest_id parent_manifest = edge_old_manifest(an_attached_edge);
      manifest_id child_manifest = j->second->second.new_manifest;
      if (this->app.db.manifest_version_exists(child_manifest))
        L(F("not requesting forward manifest delta to '%s' as we already have it\n")
          % child_manifest);
      else
        {
          if (parent_manifest.inner()().empty())
            {
              L(F("requesting full manifest data %s\n") % child_manifest);
              queue_send_data_cmd(manifest_item, plain_id(child_manifest));
            }
          else
            {
              L(F("requesting forward manifest delta %s -> %s\n")
                % parent_manifest % child_manifest);
              queue_send_delta_cmd(manifest_item,
                                   plain_id(parent_manifest),
                                   plain_id(child_manifest));
            }
        }

      // check out each file delta edge
      change_set const & an_attached_cset = edge_changes(an_attached_edge);
      for (change_set::delta_map::const_iterator k = an_attached_cset.deltas.begin();
           k != an_attached_cset.deltas.end(); ++k)
        {
          if (this->app.db.file_version_exists(delta_entry_dst(k)))
            L(F("not requesting forward delta %s -> %s on file %s as we already have it\n")
              % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
          else
            {
              if (delta_entry_src(k).inner()().empty())
                {
                  L(F("requesting full file data %s\n") % delta_entry_dst(k));
                  queue_send_data_cmd(file_item, plain_id(delta_entry_dst(k)));
                }
              else
                {

                  L(F("requesting forward delta %s -> %s on file %s\n")
                    % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
                  queue_send_delta_cmd(file_item,
                                       plain_id(delta_entry_src(k)),
                                       plain_id(delta_entry_dst(k)));
                }
            }
        }
      // now actually consume the data packet, which will wait on the
      // arrival of its prerequisites in the packet_db_writer
      this->dbw.consume_revision_data(j->first, j->second->first);
    }
}

void
session::analyze_ancestry_graph()
{
  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
  typedef map<cert_name, vector<cert> > cert_map;

  if (! (all_requested_revisions_received() && cert_refinement_done()))
    return;

  if (analyzed_ancestry)
    return;

  set<revision_id> heads;
  {
    set<revision_id> nodes, parents;
    map<revision_id, int> chld_num;
    L(F("analyzing %d ancestry edges\n") % ancestry.size());

    for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i)
      {
        nodes.insert(i->first);
        for (edge_map::const_iterator j = i->second->second.edges.begin();
             j != i->second->second.edges.end(); ++j)
          {
            parents.insert(edge_old_revision(j));
            map<revision_id, int>::iterator n;
            n = chld_num.find(edge_old_revision(j));
            if (n == chld_num.end())
              chld_num.insert(make_pair(edge_old_revision(j), 1));
            else
              ++(n->second);
          }
      }

    set_difference(nodes.begin(), nodes.end(),
                   parents.begin(), parents.end(),
                   inserter(heads, heads.begin()));

    // Write permissions checking:
    // remove heads w/o proper certs, add their children to heads
    // 1) remove unwanted branch certs from consideration
    //    - server: check write permission hook
    //    - client: check against sync pattern
    // 2) remove heads w/o a branch tag, process new exposed heads
    // 3) repeat 2 until no change

    //1
    set<string> ok_branches, bad_branches;
    cert_name bcert_name(branch_cert_name);
    cert_name tcert_name(tag_cert_name);
    for (map<revision_id, cert_map>::iterator i = received_certs.begin();
        i != received_certs.end(); ++i)
      {</