The unified diff between revisions [53526fc1..] and [5587f68e..] is displayed below. It can also be downloaded as a raw diff.

This diff has been restricted to the following files: 'netsync.cc'

#
#
# patch "netsync.cc"
#  from [c4933c5062e859a98b69e9dfa872b45a5be919da]
#    to [5d53bf85b20e88af400ccea87f1233eefdfd5daa]
#
============================================================
--- netsync.cc	c4933c5062e859a98b69e9dfa872b45a5be919da
+++ netsync.cc	5d53bf85b20e88af400ccea87f1233eefdfd5daa
@@ -8,6 +8,7 @@
 #include <memory>
 #include <list>
 #include <deque>
+#include <stack>

 #include <time.h>

@@ -306,6 +307,7 @@ session
   map<netcmd_item_type, done_marker> done_refinements;
   map<netcmd_item_type, boost::shared_ptr< set<id> > > requested_items;
   map<netcmd_item_type, boost::shared_ptr< set<id> > > received_items;
+  map<netcmd_item_type, boost::shared_ptr< set<id> > > full_delta_items;
   map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestry;
   map<revision_id, map<cert_name, vector<cert> > > received_certs;
   set< pair<id, id> > reverse_delta_requests;
@@ -345,6 +347,7 @@ session
   bool all_requested_revisions_received();

   void note_item_requested(netcmd_item_type ty, id const & i);
+  void note_item_full_delta(netcmd_item_type ty, id const & ident);
   bool item_already_requested(netcmd_item_type ty, id const & i);
   void note_item_arrived(netcmd_item_type ty, id const & i);

@@ -355,15 +358,8 @@ session
   bool got_all_data();
   void maybe_say_goodbye();

-  void analyze_attachment(revision_id const & i,
-                          set<revision_id> & visited,
-                          map<revision_id, bool> & attached);
-  void request_rev_revisions(revision_id const & init,
-                             map<revision_id, bool> attached,
-                             set<revision_id> visited);
-  void request_fwd_revisions(revision_id const & i,
-                             map<revision_id, bool> attached,
-                             set<revision_id> & visited);
+  void get_heads_and_consume_certs(set<revision_id> & heads);
+
   void analyze_ancestry_graph();
   void analyze_manifest(manifest_map const & man);

@@ -461,7 +457,39 @@ session
   bool process();
 };

+struct
+ancestry_fetcher
+{
+  session & sess;

+  // map children to parents
+  multimap< file_id, file_id > rev_file_deltas;
+  multimap< manifest_id, manifest_id > rev_manifest_deltas;
+  // map an ancestor to a child
+  multimap< file_id, file_id > fwd_file_deltas;
+  multimap< manifest_id, manifest_id > fwd_manifest_deltas;
+
+  set< file_id > seen_files;
+
+  ancestry_fetcher(session & s);
+  // analysing the ancestry graph
+  void traverse_files(change_set const & cset);
+  void traverse_manifest(manifest_id const & child_man,
+                         manifest_id const & parent_man);
+  void traverse_ancestry(set<revision_id> const & heads);
+
+  // requesting the data
+  void request_rev_file_deltas(file_id const & start,
+                               set<file_id> & done_files);
+  void request_files();
+  void request_rev_manifest_deltas(manifest_id const & start,
+                                   set<manifest_id> & done_manifests);
+  void request_manifests();
+
+
+};
+
+
 struct
 root_prefix
 {
@@ -483,6 +511,7 @@ get_root_prefix()
   return ROOT_PREFIX;
 }

+static file_id null_ident;

 session::session(protocol_role role,
                  protocol_voice voice,
@@ -549,6 +578,9 @@ session::session(protocol_role role,
   received_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
   received_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
   received_items.insert(make_pair(epoch_item, boost::shared_ptr< set<id> >(new set<id>())));
+
+  full_delta_items.insert(make_pair(manifest_item, boost::shared_ptr< set<id> >(new set<id>())));
+  full_delta_items.insert(make_pair(file_item, boost::shared_ptr< set<id> >(new set<id>())));
 }

 session::~session()
@@ -760,6 +792,15 @@ void
 }

 void
+session::note_item_full_delta(netcmd_item_type ty, id const & ident)
+{
+  map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
+    i = full_delta_items.find(ty);
+  I(i != full_delta_items.end());
+  i->second->insert(ident);
+}
+
+void
 session::note_item_arrived(netcmd_item_type ty, id const & ident)
 {
   map<netcmd_item_type, boost::shared_ptr< set<id> > >::const_iterator
@@ -879,67 +920,6 @@ session::analyze_manifest(manifest_map c
     }
 }

-static bool
-is_attached(revision_id const & i,
-            map<revision_id, bool> const & attach_map)
-{
-  map<revision_id, bool>::const_iterator j = attach_map.find(i);
-  I(j != attach_map.end());
-  return j->second;
-}
-
-// this tells us whether a particular revision is "attached" -- meaning
-// either our database contains the underlying manifest or else one of our
-// parents (recursively, and only in the current ancestry graph we're
-// requesting) is attached. if it's detached we will request it using a
-// different (more efficient and less failure-prone) algorithm
-
-void
-session::analyze_attachment(revision_id const & i,
-                            set<revision_id> & visited,
-                            map<revision_id, bool> & attached)
-{
-  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
-
-  if (visited.find(i) != visited.end())
-    return;
-
-  visited.insert(i);
-
-  bool curr_attached = false;
-
-  if (app.db.revision_exists(i))
-    {
-      L(F("revision %s is attached via database\n") % i);
-      curr_attached = true;
-    }
-  else
-    {
-      L(F("checking attachment of %s in ancestry\n") % i);
-      ancestryT::const_iterator j = ancestry.find(i);
-      if (j != ancestry.end())
-        {
-          for (edge_map::const_iterator k = j->second->second.edges.begin();
-               k != j->second->second.edges.end(); ++k)
-            {
-              L(F("checking attachment of %s in parent %s\n") % i % edge_old_revision(k));
-              analyze_attachment(edge_old_revision(k), visited, attached);
-              if (is_attached(edge_old_revision(k), attached))
-                {
-                  L(F("revision %s is attached via parent %s\n") % i % edge_old_revision(k));
-                  curr_attached = true;
-                }
-            }
-        }
-    }
-  if (curr_attached)
-    L(F("decided that revision '%s' is attached\n") % i);
-  else
-    L(F("decided that revision '%s' is not attached\n") % i);
-
-  attached[i] = curr_attached;
-}
-
 inline static id
 plain_id(manifest_id const & i)
 {
@@ -958,333 +938,114 @@ plain_id(file_id const & i)
   return tmp;
 }

-void
-session::request_rev_revisions(revision_id const & init,
-                               map<revision_id, bool> attached,
-                               set<revision_id> visited)
+void
+session::get_heads_and_consume_certs( set<revision_id> & heads )
 {
   typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
+  typedef map<cert_name, vector<cert> > cert_map;

-  set<manifest_id> seen_manifests;
-  set<file_id> seen_files;
+  set<revision_id> nodes, parents;
+  map<revision_id, int> chld_num;
+  L(F("analyzing %d ancestry edges\n") % ancestry.size());

-  set<revision_id> frontier;
-  frontier.insert(init);
-  while(!frontier.empty())
+  for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i)
     {
-      set<revision_id> next_frontier;
-      for (set<revision_id>::const_iterator i = frontier.begin();
-           i != frontier.end(); ++i)
+      nodes.insert(i->first);
+      for (edge_map::const_iterator j = i->second->second.edges.begin();
+           j != i->second->second.edges.end(); ++j)
         {
-          if (is_attached(*i, attached))
-            continue;
-
-          if (visited.find(*i) != visited.end())
-            continue;
-
-          visited.insert(*i);
-
-          ancestryT::const_iterator j = ancestry.find(*i);
-          if (j != ancestry.end())
-            {
-
-              for (edge_map::const_iterator k = j->second->second.edges.begin();
-                   k != j->second->second.edges.end(); ++k)
-                {
-
-                  next_frontier.insert(edge_old_revision(k));
-
-                  // check out the manifest delta edge
-                  manifest_id parent_manifest = edge_old_manifest(k);
-                  manifest_id child_manifest = j->second->second.new_manifest;
-
-                  // first, if we have a child we've never seen before we will need
-                  // to request it in its entrety.
-                  if (seen_manifests.find(child_manifest) == seen_manifests.end())
-                    {
-                      if (this->app.db.manifest_version_exists(child_manifest))
-                        L(F("not requesting (in reverse) initial manifest %s as we already have it\n") % child_manifest);
-                      else
-                        {
-                          L(F("requesting (in reverse) initial manifest data %s\n") % child_manifest);
-                          queue_send_data_cmd(manifest_item, plain_id(child_manifest));
-                        }
-                      seen_manifests.insert(child_manifest);
-                    }
-
-                  // second, if the parent is nonempty, we want to ask for an edge to it
-                  if (!parent_manifest.inner()().empty())
-                    {
-                      if (this->app.db.manifest_version_exists(parent_manifest))
-                        L(F("not requesting (in reverse) manifest delta to %s as we already have it\n") % parent_manifest);
-                      else
-                        {
-                          L(F("requesting (in reverse) manifest delta %s -> %s\n")
-                            % child_manifest % parent_manifest);
-                          reverse_delta_requests.insert(make_pair(plain_id(child_manifest),
-                                                                  plain_id(parent_manifest)));
-                          queue_send_delta_cmd(manifest_item,
-                                               plain_id(child_manifest),
-                                               plain_id(parent_manifest));
-                        }
-                      seen_manifests.insert(parent_manifest);
-                    }
-
-
-
-                  // check out each file delta edge
-                  change_set const & cset = edge_changes(k);
-                  for (change_set::delta_map::const_iterator d = cset.deltas.begin();
-                       d != cset.deltas.end(); ++d)
-                    {
-                      file_id parent_file (delta_entry_src(d));
-                      file_id child_file (delta_entry_dst(d));
-
-
-                      // first, if we have a child we've never seen before we will need
-                      // to request it in its entrety.
-                      if (seen_files.find(child_file) == seen_files.end())
-                        {
-                          if (this->app.db.file_version_exists(child_file))
-                            L(F("not requesting (in reverse) initial file %s as we already have it\n") % child_file);
-                          else
-                            {
-                              L(F("requesting (in reverse) initial file data %s\n") % child_file);
-                              queue_send_data_cmd(file_item, plain_id(child_file));
-                            }
-                          seen_files.insert(child_file);
-                        }
-
-                      // second, if the parent is nonempty, we want to ask for an edge to it
-                      if (!parent_file.inner()().empty())
-                        {
-                          if (this->app.db.file_version_exists(parent_file))
-                            L(F("not requesting (in reverse) file delta to %s as we already have it\n") % parent_file);
-                          else
-                            {
-                              L(F("requesting (in reverse) file delta %s -> %s on %s\n")
-                                % child_file % parent_file % delta_entry_path(d));
-                              reverse_delta_requests.insert(make_pair(plain_id(child_file),
-                                                                      plain_id(parent_file)));
-                              queue_send_delta_cmd(file_item,
-                                                   plain_id(child_file),
-                                                   plain_id(parent_file));
-                            }
-                          seen_files.insert(parent_file);
-                        }
-                    }
-                }
-
-              // now actually consume the data packet, which will wait on the
-              // arrival of its prerequisites in the packet_db_writer
-              this->dbw.consume_revision_data(j->first, j->second->first);
-            }
+          parents.insert(edge_old_revision(j));
+          map<revision_id, int>::iterator n;
+          n = chld_num.find(edge_old_revision(j));
+          if (n == chld_num.end())
+            chld_num.insert(make_pair(edge_old_revision(j), 1));
+          else
+            ++(n->second);
         }
-      frontier = next_frontier;
     }
-}
-
-void
-session::request_fwd_revisions(revision_id const & i,
-                               map<revision_id, bool> attached,
-                               set<revision_id> & visited)
-{
-  if (visited.find(i) != visited.end())
-    return;

-  visited.insert(i);
-
-  L(F("visiting revision '%s' for forward deltas\n") % i);
+  set_difference(nodes.begin(), nodes.end(),
+                 parents.begin(), parents.end(),
+                 inserter(heads, heads.begin()));

-  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
-
-  ancestryT::const_iterator j = ancestry.find(i);
-  if (j != ancestry.end())
-    {
-      edge_map::const_iterator an_attached_edge = j->second->second.edges.end();
+  L(F("intermediate set_difference heads size %d") % heads.size());

-      // first make sure we've requested enough to get to here by
-      // calling ourselves recursively. this is the forward path after all.
+  // Write permissions checking:
+  // remove heads w/o proper certs, add their children to heads
+  // 1) remove unwanted branch certs from consideration
+  // 2) remove heads w/o a branch tag, process new exposed heads
+  // 3) repeat 2 until no change

-      for (edge_map::const_iterator k = j->second->second.edges.begin();
-           k != j->second->second.edges.end(); ++k)
+  //1
+  set<string> ok_branches, bad_branches;
+  cert_name bcert_name(branch_cert_name);
+  cert_name tcert_name(tag_cert_name);
+  for (map<revision_id, cert_map>::iterator i = received_certs.begin();
+      i != received_certs.end(); ++i)
+    {
+      //branches
+      vector<cert> & bcerts(i->second[bcert_name]);
+      vector<cert> keeping;
+      for (vector<cert>::iterator j = bcerts.begin(); j != bcerts.end(); ++j)
         {
-          if (is_attached(edge_old_revision(k), attached))
-            {
-              request_fwd_revisions(edge_old_revision(k), attached, visited);
-              an_attached_edge = k;
-            }
-        }
-
-      I(an_attached_edge != j->second->second.edges.end());
-
-      // check out the manifest delta edge
-      manifest_id parent_manifest = edge_old_manifest(an_attached_edge);
-      manifest_id child_manifest = j->second->second.new_manifest;
-      if (this->app.db.manifest_version_exists(child_manifest))
-        L(F("not requesting forward manifest delta to '%s' as we already have it\n")
-          % child_manifest);
-      else
-        {
-          if (parent_manifest.inner()().empty())
-            {
-              L(F("requesting full manifest data %s\n") % child_manifest);
-              queue_send_data_cmd(manifest_item, plain_id(child_manifest));
-            }
+          cert_value name;
+          decode_base64(j->value, name);
+          if (ok_branches.find(name()) != ok_branches.end())
+            keeping.push_back(*j);
+          else if (bad_branches.find(name()) != bad_branches.end())
+            ;
           else
             {
-              L(F("requesting forward manifest delta %s -> %s\n")
-                % parent_manifest % child_manifest);
-              queue_send_delta_cmd(manifest_item,
-                                   plain_id(parent_manifest),
-                                   plain_id(child_manifest));
+              if (our_matcher(name()))
+                {
+                  ok_branches.insert(name());
+                  keeping.push_back(*j);
+                }
+              else
+                {
+                  bad_branches.insert(name());
+                  W(F("Dropping branch certs for unwanted branch %s")
+                    % name);
+                }
             }
         }
+      bcerts = keeping;
+    }
+  //2
+  list<revision_id> tmp;
+  for (set<revision_id>::iterator i = heads.begin(); i != heads.end(); ++i)
+    {
+      if (!received_certs[*i][bcert_name].size())
+        tmp.push_back(*i);
+    }
+  for (list<revision_id>::iterator i = tmp.begin(); i != tmp.end(); ++i)
+    heads.erase(*i);

-      // check out each file delta edge
-      change_set const & an_attached_cset = edge_changes(an_attached_edge);
-      for (change_set::delta_map::const_iterator k = an_attached_cset.deltas.begin();
-           k != an_attached_cset.deltas.end(); ++k)
+  L(F("after step 2, heads size %d") % heads.size());
+  //3
+  while (tmp.size())
+    {
+      ancestryT::const_iterator i = ancestry.find(tmp.front());
+      if (i != ancestry.end())
         {
-          if (this->app.db.file_version_exists(delta_entry_dst(k)))
-            L(F("not requesting forward delta %s -> %s on file %s as we already have it\n")
-              % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
-          else
+          for (edge_map::const_iterator j = i->second->second.edges.begin();
+               j != i->second->second.edges.end(); ++j)
             {
-              if (delta_entry_src(k).inner()().empty())
+              if (!--chld_num[edge_old_revision(j)])
                 {
-                  L(F("requesting full file data %s\n") % delta_entry_dst(k));
-                  queue_send_data_cmd(file_item, plain_id(delta_entry_dst(k)));
+                  if (received_certs[i->first][bcert_name].size())
+                    heads.insert(i->first);
+                  else
+                    tmp.push_back(edge_old_revision(j));
                 }
-              else
-                {
-
-                  L(F("requesting forward delta %s -> %s on file %s\n")
-                    % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
-                  queue_send_delta_cmd(file_item,
-                                       plain_id(delta_entry_src(k)),
-                                       plain_id(delta_entry_dst(k)));
-                }
             }
+          // since we don't want this rev, we don't want it's certs either
+          received_certs[tmp.front()] = cert_map();
         }
-      // now actually consume the data packet, which will wait on the
-      // arrival of its prerequisites in the packet_db_writer
-      this->dbw.consume_revision_data(j->first, j->second->first);
+        tmp.pop_front();
     }
-}

-void
-session::analyze_ancestry_graph()
-{
-  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
-  typedef map<cert_name, vector<cert> > cert_map;
-
-  if (! (all_requested_revisions_received() && cert_refinement_done()))
-    return;
-
-  if (analyzed_ancestry)
-    return;
-
-  set<revision_id> heads;
-  {
-    set<revision_id> nodes, parents;
-    map<revision_id, int> chld_num;
-    L(F("analyzing %d ancestry edges\n") % ancestry.size());
-
-    for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i)
-      {
-        nodes.insert(i->first);
-        for (edge_map::const_iterator j = i->second->second.edges.begin();
-             j != i->second->second.edges.end(); ++j)
-          {
-            parents.insert(edge_old_revision(j));
-            map<revision_id, int>::iterator n;
-            n = chld_num.find(edge_old_revision(j));
-            if (n == chld_num.end())
-              chld_num.insert(make_pair(edge_old_revision(j), 1));
-            else
-              ++(n->second);
-          }
-      }
-
-    set_difference(nodes.begin(), nodes.end(),
-                   parents.begin(), parents.end(),
-                   inserter(heads, heads.begin()));
-
-    // Write permissions checking:
-    // remove heads w/o proper certs, add their children to heads
-    // 1) remove unwanted branch certs from consideration
-    // 2) remove heads w/o a branch tag, process new exposed heads
-    // 3) repeat 2 until no change
-
-    //1
-    set<string> ok_branches, bad_branches;
-    cert_name bcert_name(branch_cert_name);
-    cert_name tcert_name(tag_cert_name);
-    for (map<revision_id, cert_map>::iterator i = received_certs.begin();
-        i != received_certs.end(); ++i)
-      {
-        //branches
-        vector<cert> & bcerts(i->second[bcert_name]);
-        vector<cert> keeping;
-        for (vector<cert>::iterator j = bcerts.begin(); j != bcerts.end(); ++j)
-          {
-            cert_value name;
-            decode_base64(j->value, name);
-            if (ok_branches.find(name()) != ok_branches.end())
-              keeping.push_back(*j);
-            else if (bad_branches.find(name()) != bad_branches.end())
-              ;
-            else
-              {
-                if (our_matcher(name()))
-                  {
-                    ok_branches.insert(name());
-                    keeping.push_back(*j);
-                  }
-                else
-                  {
-                    bad_branches.insert(name());
-                    W(F("Dropping branch certs for unwanted branch %s")
-                      % name);
-                  }
-              }
-          }
-        bcerts = keeping;
-      }
-    //2
-    list<revision_id> tmp;
-    for (set<revision_id>::iterator i = heads.begin(); i != heads.end(); ++i)
-      {
-        if (!received_certs[*i][bcert_name].size())
-          tmp.push_back(*i);
-      }
-    for (list<revision_id>::iterator i = tmp.begin(); i != tmp.end(); ++i)
-      heads.erase(*i);
-    //3
-    while (tmp.size())
-      {
-        ancestryT::const_iterator i = ancestry.find(tmp.front());
-        if (i != ancestry.end())
-          {
-            for (edge_map::const_iterator j = i->second->second.edges.begin();
-                 j != i->second->second.edges.end(); ++j)
-              {
-                if (!--chld_num[edge_old_revision(j)])
-                  {
-                    if (received_certs[i->first][bcert_name].size())
-                      heads.insert(i->first);
-                    else
-                      tmp.push_back(edge_old_revision(j));
-                  }
-              }
-            // since we don't want this rev, we don't want it's certs either
-            received_certs[tmp.front()] = cert_map();
-          }
-          tmp.pop_front();
-      }
-  }
-
+  L(F("after step 3, heads size %d") % heads.size());
   // We've reduced the certs to those we want now, send them to dbw.
   for (map<revision_id, cert_map>::iterator i = received_certs.begin();
       i != received_certs.end(); ++i)
@@ -1299,43 +1060,28 @@ session::analyze_ancestry_graph()
             }
         }
     }
+}

-  L(F("isolated %d heads\n") % heads.size());
+void
+session::analyze_ancestry_graph()
+{
+  L(F("analyze_ancestry_graph"));
+  if (! (all_requested_revisions_received() && cert_refinement_done()))
+    {
+      L(F("not all done in analyze_ancestry_graph"));
+      return;
+    }

-  // first we determine the "attachment status" of each node in our ancestry
-  // graph.
+  if (analyzed_ancestry)
+    {
+      L(F("already analyzed_ancestry in analyze_ancestry_graph"));
+      return;
+    }

-  map<revision_id, bool> attached;
-  set<revision_id> visited;
-  for (set<revision_id>::const_iterator i = heads.begin();
-       i != heads.end(); ++i)
-    analyze_attachment(*i, visited, attached);
+  L(F("analyze_ancestry_graph fetching"));

-  // then we walk the graph upwards, recursively, starting from each of the
-  // heads. we either walk requesting forward deltas or reverse deltas,
-  // depending on whether we are walking an attached or detached subgraph,
-  // respectively. the forward walk ignores detached nodes, the backward walk
-  // ignores attached nodes.
+  ancestry_fetcher fetch(*this);

-  set<revision_id> fwd_visited, rev_visited;
-
-  for (set<revision_id>::const_iterator i = heads.begin();
-       i != heads.end(); ++i)
-    {
-      map<revision_id, bool>::const_iterator k = attached.find(*i);
-      I(k != attached.end());
-
-      if (k->second)
-        {
-          L(F("requesting attached ancestry of revision '%s'\n") % *i);
-          request_fwd_revisions(*i, attached, fwd_visited);
-        }
-      else
-        {
-          L(F("requesting detached ancestry of revision '%s'\n") % *i);
-          request_rev_revisions(*i, attached, rev_visited);
-        }
-    }
   analyzed_ancestry = true;
 }

@@ -2921,7 +2667,15 @@ session::process_delta_cmd(netcmd_item_t
     case manifest_item:
       {
         manifest_id src_manifest(hbase), dst_manifest(hident);
-        if (reverse_delta_requests.find(id_pair)
+        if (full_delta_items[manifest_item]->find(ident)
+            != full_delta_items[manifest_item]->end())
+          {
+            this->dbw.consume_manifest_delta(src_manifest,
+                                             dst_manifest,
+                                             manifest_delta(del),
+                                             true);
+          }
+        else if (reverse_delta_requests.find(id_pair)
             != reverse_delta_requests.end())
           {
             reverse_delta_requests.erase(id_pair);
@@ -2940,7 +2694,15 @@ session::process_delta_cmd(netcmd_item_t
     case file_item:
       {
         file_id src_file(hbase), dst_file(hident);
-        if (reverse_delta_requests.find(id_pair)
+        if (full_delta_items[file_item]->find(ident)
+            != full_delta_items[file_item]->end())
+          {
+            this->dbw.consume_file_delta(src_file,
+                                             dst_file,
+                                             file_delta(del),
+                                             true);
+          }
+        else if (reverse_delta_requests.find(id_pair)
             != reverse_delta_requests.end())
           {
             reverse_delta_requests.erase(id_pair);
@@ -3863,3 +3625,362 @@ run_netsync_protocol(protocol_voice voic
   end_platform_netsync();
 }

+
+// Steps for determining files/manifests to request, from
+// a given revision ancestry:
+//
+// 1) find the new heads, consume valid branch certs etc.
+//
+// 2) foreach new head, traverse up the revision ancestry, building
+// a set of reverse file/manifest deltas (we stop when we hit an
+// already-seen or existing-in-db rev). At the same time, build a
+// smaller set of forward deltas to files/manifests which exist in the
+// head revisions.
+//
+// 3) For each file/manifest in head, first request the forward delta
+// (or full data if there is no path back to existing data). Then
+// traverse up the set of reverse deltas, daisychaining our way until
+// we get to existing revisions.
+//
+// Notes:
+//
+// - The database stores reverse deltas, so preferring these allows
+// a server to send pre-computed deltas straight from the database
+// (this isn't done yet). In order to bootstrap the tip-most data,
+// forward deltas from a close(est?)-ancestor are used, or full data
+// is requested if there is no existing ancestor.
+//
+// eg, if we have the (manifest) ancestry
+// A -> B -> C -> D
+// where A is existing, {B,C,D} are new, then we will request deltas
+// A->D (fwd)
+// D->C (rev)
+// C->B (rev)
+// This may result in slightly larger deltas than using all forward
+// deltas, however it should be more efficient.
+//
+// - in order to keep a good hit ratio with the reconstructed version
+// cache in database, we'll request deltas for a single file/manifest
+// all at once, rather than requesting deltas per-revision. This
+// requires a bit more memory usage, though it will be less memory
+// than would be required to store all the outgoing delta requests
+// anyway.
+ancestry_fetcher::ancestry_fetcher(session & s)
+    : sess(s)
+{
+  set<revision_id> new_heads;
+  sess.get_heads_and_consume_certs( new_heads );
+
+  L(F("ancestry_fetcher: got %d heads") % new_heads.size());
+
+  traverse_ancestry(new_heads);
+
+  request_files();
+  request_manifests();
+}
+
+void
+ancestry_fetcher::traverse_files(change_set const & cset)
+{
+  for (change_set::delta_map::const_iterator d = cset.deltas.begin();
+       d != cset.deltas.end(); ++d)
+    {
+      file_id parent_file (delta_entry_src(d));
+      file_id child_file (delta_entry_dst(d));
+      L(F("traverse_files parent %s child %s")
+        % parent_file % child_file);
+
+      I(!(parent_file == child_file));
+      // XXX when changeset format is altered to have [...]->[] deltas on deletion,
+      // this assertion needs revisiting
+      I(!null_id(child_file));
+
+      // request the reverse delta
+      if (!null_id(parent_file))
+        {
+          L(F("inserting file rev_deltas"));
+          rev_file_deltas.insert(make_pair(child_file, parent_file));
+        }
+
+      // add any new forward deltas
+      if (seen_files.find(child_file) == seen_files.end())
+        {
+          L(F("inserting fwd_jump_deltas"));
+          fwd_file_deltas.insert( make_pair( parent_file, child_file ) );
+        }
+
+      // update any forward deltas. no point updating if it already
+      // points to something we have.
+      if (!null_id(parent_file)
+          && fwd_file_deltas.find(child_file) != fwd_file_deltas.end())
+        {
+          // We're traversing with child->parent of A->B.
+          // Update any forward deltas with a parent of B to
+          // have A as a parent, ie B->C becomes A->C.
+          for (multimap<file_id,file_id>::iterator d =
+               fwd_file_deltas.lower_bound(child_file);
+               d != fwd_file_deltas.upper_bound(child_file);
+               d++)
+            {
+              fwd_file_deltas.insert(make_pair(parent_file, d->second));
+            }
+
+            fwd_file_deltas.erase(fwd_file_deltas.lower_bound(child_file),
+                                  fwd_file_deltas.upper_bound(child_file));
+        }
+
+      seen_files.insert(child_file);
+      seen_files.insert(parent_file);
+    }
+}
+
+void
+ancestry_fetcher::traverse_manifest(manifest_id const & child_man,
+                                    manifest_id const & parent_man)
+{
+  L(F("traverse_manifest parent %s child %s")
+    % parent_man % child_man);
+  I(!null_id(child_man));
+  // add reverse deltas
+  if (!null_id(parent_man))
+    {
+      L(F("inserting manifest rev_deltas"));
+      rev_manifest_deltas.insert(make_pair(child_man, parent_man));
+    }
+
+  // handle the manifest forward-deltas
+  if (!null_id(parent_man)
+      // don't update child to itself, it makes the loop iterate infinitely.
+      && !(parent_man == child_man)
+      && fwd_manifest_deltas.find(child_man) != fwd_manifest_deltas.end())
+    {
+      // We're traversing with child->parent of A->B.
+      // Update any forward deltas with a parent of B to
+      // have A as a parent, ie B->C becomes A->C.
+      for (multimap<manifest_id,manifest_id>::iterator d =
+           fwd_manifest_deltas.lower_bound(child_man);
+           d != fwd_manifest_deltas.upper_bound(child_man);
+           d++)
+        {
+          L(F("size %d\n") % fwd_manifest_deltas.size());
+          L(F("inserting %s->%s") % parent_man % d->second);
+          fwd_manifest_deltas.insert(make_pair(parent_man, d->second));
+        }
+
+      fwd_manifest_deltas.erase(fwd_manifest_deltas.lower_bound(child_man),
+                                fwd_manifest_deltas.upper_bound(child_man));
+    }
+}
+
+void
+ancestry_fetcher::traverse_ancestry(set<revision_id> const & heads)
+{
+  deque<revision_id> frontier;
+  set<revision_id> seen_revs;
+
+  for (set<revision_id>::const_iterator h = heads.begin();
+       h != heads.end(); h++)
+    {
+      L(F("inserting head %s") % *h);
+      frontier.push_back(*h);
+      seen_revs.insert(*h);
+      manifest_id const & m = sess.ancestry[*h]->second.new_manifest;
+      fwd_manifest_deltas.insert(make_pair(m,m));
+    }
+
+  // breadth first up the ancestry
+  while (!frontier.empty())
+    {
+      revision_id const & rev = frontier.front();
+
+      L(F("frontier %s") % rev);
+      I(sess.ancestry.find(rev) != sess.ancestry.end());
+
+      for (edge_map::const_iterator e = sess.ancestry[rev]->second.edges.begin();
+           e != sess.ancestry[rev]->second.edges.end(); e++)
+        {
+          revision_id const & par = edge_old_revision(e);
+          if (seen_revs.find(par) == seen_revs.end())
+            {
+              if (sess.ancestry.find(par) != sess.ancestry.end())
+                {
+                  L(F("push_back to frontier %s") % par);
+                  frontier.push_back(par);
+                }
+              seen_revs.insert(par);
+            }
+
+          traverse_manifest(sess.ancestry[rev]->second.new_manifest,
+                            edge_old_manifest(e));
+          traverse_files(edge_changes(e));
+
+        }
+
+      sess.dbw.consume_revision_data(rev, sess.ancestry[rev]->first);
+      frontier.pop_front();
+    }
+}
+
+void
+ancestry_fetcher::request_rev_file_deltas(file_id const & start,
+                                          set<file_id> & done_files)
+{
+  stack< file_id > frontier;
+  frontier.push(start);
+
+  while (!frontier.empty())
+    {
+      file_id const child = frontier.top();
+      I(!null_id(child));
+      frontier.pop();
+
+      for (multimap< file_id, file_id>::const_iterator
+           d = rev_file_deltas.lower_bound(child);
+           d != rev_file_deltas.upper_bound(child);
+           d++)
+        {
+          file_id const & parent = d->second;
+          I(!null_id(parent));
+          if (done_files.find(parent) == done_files.end())
+            {
+              done_files.insert(parent);
+              if (!sess.app.db.file_version_exists(parent))
+                {
+                  L(F("requesting reverse file delta %s->%s")
+                    % child % parent);
+                  sess.queue_send_delta_cmd(file_item,
+                                            plain_id(child), plain_id(parent));
+                  sess.reverse_delta_requests.insert(make_pair(plain_id(child),
+                                                               plain_id(parent)));
+                }
+              else
+                {
+                  L(F("file %s exists, not requesting rev delta")
+                    % parent);
+                }
+              frontier.push(parent);
+            }
+        }
+    }
+}
+
+void
+ancestry_fetcher::request_files()
+{
+  // just a cache to avoid checking db.foo_version_exists() too much
+  set<file_id> done_files;
+
+  for (multimap<file_id,file_id>::const_iterator d = fwd_file_deltas.begin();
+       d != fwd_file_deltas.end(); d++)
+    {
+      file_id const & anc = d->first;
+      file_id const & child = d->second;
+      if (!sess.app.db.file_version_exists(child))
+        {
+          if (null_id(anc)
+              || !sess.app.db.file_version_exists(anc))
+            {
+              L(F("requesting full file %s") % child);
+              sess.queue_send_data_cmd(file_item, plain_id(child));
+            }
+          else
+            {
+              L(F("requesting forward delta %s->%s")
+                % anc % child);
+              sess.queue_send_delta_cmd(file_item,
+                                        plain_id(anc), plain_id(child));
+              sess.note_item_full_delta(file_item, plain_id(child));
+            }
+        }
+      else
+        {
+          L(F("not requesting fwd delta %s->%s, already have dst")
+            % anc % child);
+        }
+
+      // traverse up the reverse deltas
+      request_rev_file_deltas(child, done_files);
+    }
+}
+
+void
+ancestry_fetcher::request_rev_manifest_deltas(manifest_id const & start,
+                                              set<manifest_id> & done_manifests)
+{
+  stack< manifest_id > frontier;
+  frontier.push(start);
+
+  while (!frontier.empty())
+    {
+      manifest_id const child = frontier.top();
+      I(!null_id(child));
+      frontier.pop();
+
+      for (multimap< manifest_id, manifest_id>::const_iterator
+           d = rev_manifest_deltas.lower_bound(child);
+           d != rev_manifest_deltas.upper_bound(child);
+           d++)
+        {
+          manifest_id const & parent = d->second;
+          I(!null_id(parent));
+          if (done_manifests.find(parent) == done_manifests.end())
+            {
+              done_manifests.insert(parent);
+              if (!sess.app.db.manifest_version_exists(parent))
+                {
+                  L(F("requesting reverse manifest delta %s->%s")
+                    % child % parent);
+                  sess.queue_send_delta_cmd(manifest_item,
+                                            plain_id(child), plain_id(parent));
+                  sess.reverse_delta_requests.insert(make_pair(plain_id(child),
+                                                               plain_id(parent)));
+                }
+              else
+                {
+                  L(F("manifest %s exists, not requesting rev delta")
+                    % parent);
+                }
+              frontier.push(parent);
+            }
+        }
+    }
+}
+
+void
+ancestry_fetcher::request_manifests()
+{
+  // just a cache to avoid checking db.foo_version_exists() too much
+  set<manifest_id> done_manifests;
+
+  for (multimap<manifest_id,manifest_id>::const_iterator d = fwd_manifest_deltas.begin();
+       d != fwd_manifest_deltas.end(); d++)
+    {
+      manifest_id const & anc = d->first;
+      manifest_id const & child = d->second;
+      if (!sess.app.db.manifest_version_exists(child))
+        {
+          if (null_id(anc)
+              || !sess.app.db.manifest_version_exists(anc))
+            {
+              L(F("requesting full manifest %s") % child);
+              sess.queue_send_data_cmd(manifest_item, plain_id(child));
+            }
+          else
+            {
+              L(F("requesting forward delta %s->%s")
+                % anc % child);
+              sess.queue_send_delta_cmd(manifest_item,
+                                        plain_id(anc), plain_id(child));
+              sess.note_item_full_delta(manifest_item, plain_id(child));
+            }
+        }
+      else
+        {
+          L(F("not requesting fwd delta %s->%s, already have dst")
+            % anc % child);
+        }
+
+      // traverse up the reverse deltas
+      request_rev_manifest_deltas(child, done_manifests);
+    }
+}