The unified diff between revisions [70e9ba55..] and [2227bef1..] is displayed below. It can also be downloaded as a raw diff.

#
#
# patch "netsync.cc"
#  from [1a08b80949f7458e7868d09b0647dbdfd2f5d645]
#    to [2158f0f1f9e8f5e83aa4a5dfefa05d104bdb0a0e]
#
# patch "packet.cc"
#  from [407e15cf1c485e09665a372588be2f99491a17d7]
#    to [0867d1963018a7de23a7ca07be1e637817aaeb50]
#
# patch "packet.hh"
#  from [ec7178c2332473305c0aa7d00c727e338fc7810d]
#    to [c2d6f03ffa901f98ad6623090b687982d27db552]
#
============================================================
--- netsync.cc	1a08b80949f7458e7868d09b0647dbdfd2f5d645
+++ netsync.cc	2158f0f1f9e8f5e83aa4a5dfefa05d104bdb0a0e
@@ -8,6 +8,7 @@
 #include <memory>
 #include <list>
 #include <deque>
+#include <stack>

 #include <time.h>

@@ -359,15 +360,6 @@ session

   void get_heads_and_consume_certs(set<revision_id> & heads);

-  void analyze_attachment(revision_id const & i,
-                          set<revision_id> & visited,
-                          map<revision_id, bool> & attached);
-  void request_rev_revisions(revision_id const & init,
-                             map<revision_id, bool> attached,
-                             set<revision_id> visited);
-  void request_fwd_revisions(revision_id const & i,
-                             map<revision_id, bool> attached,
-                             set<revision_id> & visited);
   void analyze_ancestry_graph();
   void analyze_manifest(manifest_map const & man);

@@ -469,26 +461,23 @@ ancestry_fetcher
 {
   session & sess;

-  set< revision_id> seen_revs;
   // map children to parents
-  map< file_id, set<file_id> > rev_file_deltas;
+  multimap< file_id, file_id > rev_file_deltas;
+  multimap< manifest_id, manifest_id > rev_manifest_deltas;
   // map an ancestor to a child
-  map< file_id, file_id > fwd_file_deltas;
-  set< file_id > full_files;
-  map< manifest_id, set<manifest_id> > rev_manifest_deltas;
-  map< manifest_id, manifest_id > fwd_manifest_deltas;
-  set< manifest_id > full_manifests;
+  multimap< file_id, file_id > fwd_file_deltas;
+  multimap< manifest_id, manifest_id > fwd_manifest_deltas;
+
   set< file_id > seen_files;
-  set< file_id > anchor_files;
-  set< manifest_id > anchor_manifests;

   ancestry_fetcher(session & s);
-  void traverse_files(change_set const & cset,
-                      map< file_id, file_id > & fwd_jump_deltas);
+  // analysing the ancestry graph
+  void traverse_files(change_set const & cset);
   void traverse_manifest(manifest_id const & child_man,
-                         manifest_id const & parent_man,
-                         manifest_id & fwd_anc);
-  void traverse_ancestry(revision_id const & head);
+                         manifest_id const & parent_man);
+  void traverse_ancestry(set<revision_id> const & heads);
+
+  // requesting the data
   void request_rev_file_deltas(file_id const & start);
   void request_files();
   void request_rev_manifest_deltas(manifest_id const & start);
@@ -926,67 +915,6 @@ session::analyze_manifest(manifest_map c
     }
 }

-static bool
-is_attached(revision_id const & i,
-            map<revision_id, bool> const & attach_map)
-{
-  map<revision_id, bool>::const_iterator j = attach_map.find(i);
-  I(j != attach_map.end());
-  return j->second;
-}
-
-// this tells us whether a particular revision is "attached" -- meaning
-// either our database contains the underlying manifest or else one of our
-// parents (recursively, and only in the current ancestry graph we're
-// requesting) is attached. if it's detached we will request it using a
-// different (more efficient and less failure-prone) algorithm
-
-void
-session::analyze_attachment(revision_id const & i,
-                            set<revision_id> & visited,
-                            map<revision_id, bool> & attached)
-{
-  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
-
-  if (visited.find(i) != visited.end())
-    return;
-
-  visited.insert(i);
-
-  bool curr_attached = false;
-
-  if (app.db.revision_exists(i))
-    {
-      L(F("revision %s is attached via database\n") % i);
-      curr_attached = true;
-    }
-  else
-    {
-      L(F("checking attachment of %s in ancestry\n") % i);
-      ancestryT::const_iterator j = ancestry.find(i);
-      if (j != ancestry.end())
-        {
-          for (edge_map::const_iterator k = j->second->second.edges.begin();
-               k != j->second->second.edges.end(); ++k)
-            {
-              L(F("checking attachment of %s in parent %s\n") % i % edge_old_revision(k));
-              analyze_attachment(edge_old_revision(k), visited, attached);
-              if (is_attached(edge_old_revision(k), attached))
-                {
-                  L(F("revision %s is attached via parent %s\n") % i % edge_old_revision(k));
-                  curr_attached = true;
-                }
-            }
-        }
-    }
-  if (curr_attached)
-    L(F("decided that revision '%s' is attached\n") % i);
-  else
-    L(F("decided that revision '%s' is not attached\n") % i);
-
-  attached[i] = curr_attached;
-}
-
 inline static id
 plain_id(manifest_id const & i)
 {
@@ -1005,221 +933,6 @@ plain_id(file_id const & i)
   return tmp;
 }

-void
-session::request_rev_revisions(revision_id const & init,
-                               map<revision_id, bool> attached,
-                               set<revision_id> visited)
-{
-  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
-
-  set<manifest_id> seen_manifests;
-  set<file_id> seen_files;
-
-  set<revision_id> frontier;
-  frontier.insert(init);
-  while(!frontier.empty())
-    {
-      set<revision_id> next_frontier;
-      for (set<revision_id>::const_iterator i = frontier.begin();
-           i != frontier.end(); ++i)
-        {
-          if (is_attached(*i, attached))
-            continue;
-
-          if (visited.find(*i) != visited.end())
-            continue;
-
-          visited.insert(*i);
-
-          ancestryT::const_iterator j = ancestry.find(*i);
-          if (j != ancestry.end())
-            {
-
-              for (edge_map::const_iterator k = j->second->second.edges.begin();
-                   k != j->second->second.edges.end(); ++k)
-                {
-
-                  next_frontier.insert(edge_old_revision(k));
-
-                  // check out the manifest delta edge
-                  manifest_id parent_manifest = edge_old_manifest(k);
-                  manifest_id child_manifest = j->second->second.new_manifest;
-
-                  // first, if we have a child we've never seen before we will need
-                  // to request it in its entrety.
-                  if (seen_manifests.find(child_manifest) == seen_manifests.end())
-                    {
-                      if (this->app.db.manifest_version_exists(child_manifest))
-                        L(F("not requesting (in reverse) initial manifest %s as we already have it\n") % child_manifest);
-                      else
-                        {
-                          L(F("requesting (in reverse) initial manifest data %s\n") % child_manifest);
-                          queue_send_data_cmd(manifest_item, plain_id(child_manifest));
-                        }
-                      seen_manifests.insert(child_manifest);
-                    }
-
-                  // second, if the parent is nonempty, we want to ask for an edge to it
-                  if (!parent_manifest.inner()().empty())
-                    {
-                      if (this->app.db.manifest_version_exists(parent_manifest))
-                        L(F("not requesting (in reverse) manifest delta to %s as we already have it\n") % parent_manifest);
-                      else
-                        {
-                          L(F("requesting (in reverse) manifest delta %s -> %s\n")
-                            % child_manifest % parent_manifest);
-                          reverse_delta_requests.insert(make_pair(plain_id(child_manifest),
-                                                                  plain_id(parent_manifest)));
-                          queue_send_delta_cmd(manifest_item,
-                                               plain_id(child_manifest),
-                                               plain_id(parent_manifest));
-                        }
-                      seen_manifests.insert(parent_manifest);
-                    }
-
-
-
-                  // check out each file delta edge
-                  change_set const & cset = edge_changes(k);
-                  for (change_set::delta_map::const_iterator d = cset.deltas.begin();
-                       d != cset.deltas.end(); ++d)
-                    {
-                      file_id parent_file (delta_entry_src(d));
-                      file_id child_file (delta_entry_dst(d));
-
-
-                      // first, if we have a child we've never seen before we will need
-                      // to request it in its entrety.
-                      if (seen_files.find(child_file) == seen_files.end())
-                        {
-                          if (this->app.db.file_version_exists(child_file))
-                            L(F("not requesting (in reverse) initial file %s as we already have it\n") % child_file);
-                          else
-                            {
-                              L(F("requesting (in reverse) initial file data %s\n") % child_file);
-                              queue_send_data_cmd(file_item, plain_id(child_file));
-                            }
-                          seen_files.insert(child_file);
-                        }
-
-                      // second, if the parent is nonempty, we want to ask for an edge to it
-                      if (!parent_file.inner()().empty())
-                        {
-                          if (this->app.db.file_version_exists(parent_file))
-                            L(F("not requesting (in reverse) file delta to %s as we already have it\n") % parent_file);
-                          else
-                            {
-                              L(F("requesting (in reverse) file delta %s -> %s on %s\n")
-                                % child_file % parent_file % delta_entry_path(d));
-                              reverse_delta_requests.insert(make_pair(plain_id(child_file),
-                                                                      plain_id(parent_file)));
-                              queue_send_delta_cmd(file_item,
-                                                   plain_id(child_file),
-                                                   plain_id(parent_file));
-                            }
-                          seen_files.insert(parent_file);
-                        }
-                    }
-                }
-
-              // now actually consume the data packet, which will wait on the
-              // arrival of its prerequisites in the packet_db_writer
-              this->dbw.consume_revision_data(j->first, j->second->first);
-            }
-        }
-      frontier = next_frontier;
-    }
-}
-
-void
-session::request_fwd_revisions(revision_id const & i,
-                               map<revision_id, bool> attached,
-                               set<revision_id> & visited)
-{
-  if (visited.find(i) != visited.end())
-    return;
-
-  visited.insert(i);
-
-  L(F("visiting revision '%s' for forward deltas\n") % i);
-
-  typedef map<revision_id, boost::shared_ptr< pair<revision_data, revision_set> > > ancestryT;
-
-  ancestryT::const_iterator j = ancestry.find(i);
-  if (j != ancestry.end())
-    {
-      edge_map::const_iterator an_attached_edge = j->second->second.edges.end();
-
-      // first make sure we've requested enough to get to here by
-      // calling ourselves recursively. this is the forward path after all.
-
-      for (edge_map::const_iterator k = j->second->second.edges.begin();
-           k != j->second->second.edges.end(); ++k)
-        {
-          if (is_attached(edge_old_revision(k), attached))
-            {
-              request_fwd_revisions(edge_old_revision(k), attached, visited);
-              an_attached_edge = k;
-            }
-        }
-
-      I(an_attached_edge != j->second->second.edges.end());
-
-      // check out the manifest delta edge
-      manifest_id parent_manifest = edge_old_manifest(an_attached_edge);
-      manifest_id child_manifest = j->second->second.new_manifest;
-      if (this->app.db.manifest_version_exists(child_manifest))
-        L(F("not requesting forward manifest delta to '%s' as we already have it\n")
-          % child_manifest);
-      else
-        {
-          if (parent_manifest.inner()().empty())
-            {
-              L(F("requesting full manifest data %s\n") % child_manifest);
-              queue_send_data_cmd(manifest_item, plain_id(child_manifest));
-            }
-          else
-            {
-              L(F("requesting forward manifest delta %s -> %s\n")
-                % parent_manifest % child_manifest);
-              queue_send_delta_cmd(manifest_item,
-                                   plain_id(parent_manifest),
-                                   plain_id(child_manifest));
-            }
-        }
-
-      // check out each file delta edge
-      change_set const & an_attached_cset = edge_changes(an_attached_edge);
-      for (change_set::delta_map::const_iterator k = an_attached_cset.deltas.begin();
-           k != an_attached_cset.deltas.end(); ++k)
-        {
-          if (this->app.db.file_version_exists(delta_entry_dst(k)))
-            L(F("not requesting forward delta %s -> %s on file %s as we already have it\n")
-              % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
-          else
-            {
-              if (delta_entry_src(k).inner()().empty())
-                {
-                  L(F("requesting full file data %s\n") % delta_entry_dst(k));
-                  queue_send_data_cmd(file_item, plain_id(delta_entry_dst(k)));
-                }
-              else
-                {
-
-                  L(F("requesting forward delta %s -> %s on file %s\n")
-                    % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k));
-                  queue_send_delta_cmd(file_item,
-                                       plain_id(delta_entry_src(k)),
-                                       plain_id(delta_entry_dst(k)));
-                }
-            }
-        }
-      // now actually consume the data packet, which will wait on the
-      // arrival of its prerequisites in the packet_db_writer
-      this->dbw.consume_revision_data(j->first, j->second->first);
-    }
-}
-
 void
 session::get_heads_and_consume_certs( set<revision_id> & heads )
 {
@@ -1360,7 +1073,7 @@ session::analyze_ancestry_graph()
       return;
     }

-  L(F("analyze_ancestry_graph doing a fetch"));
+  L(F("analyze_ancestry_graph fetching"));

   ancestry_fetcher fetch(*this);

@@ -2949,7 +2662,15 @@ session::process_delta_cmd(netcmd_item_t
     case manifest_item:
       {
         manifest_id src_manifest(hbase), dst_manifest(hident);
-        if (reverse_delta_requests.find(id_pair)
+        if (full_delta_items[manifest_item]->find(ident)
+            != full_delta_items[manifest_item]->end())
+          {
+            this->dbw.consume_manifest_delta(src_manifest,
+                                             dst_manifest,
+                                             manifest_delta(del),
+                                             true);
+          }
+        else if (reverse_delta_requests.find(id_pair)
             != reverse_delta_requests.end())
           {
             reverse_delta_requests.erase(id_pair);
@@ -2968,13 +2689,13 @@ session::process_delta_cmd(netcmd_item_t
     case file_item:
       {
         file_id src_file(hbase), dst_file(hident);
-        if (full_delta_items[file_item].find(dst_file)
-            != full_delta_items[file_item].end())
+        if (full_delta_items[file_item]->find(ident)
+            != full_delta_items[file_item]->end())
           {
             this->dbw.consume_file_delta(src_file,
-                                         dst_file,
-                                         file_delta(del),
-                                         true);
+                                             dst_file,
+                                             file_delta(del),
+                                             true);
           }
         else if (reverse_delta_requests.find(id_pair)
             != reverse_delta_requests.end())
@@ -3872,82 +3593,106 @@ run_netsync_protocol(protocol_voice voic
   end_platform_netsync();
 }

+
+// Notes:
+//
+// - it is cheaper for both the source and for the sink to receive
+// reverse deltas (as opposed to forward deltas), since that's what
+// gets stored in the db. Hence, we try to always use reverse deltas.
+// If we have the (manifest) ancestry
+// A -> B -> C -> D
+// where A is existing, {B,C,D} are new, then we will request deltas
+// A->D (fwd)
+// D->C (rev)
+// C->B (rev)
+// This may result in slightly larger deltas than using all forward
+// deltas, however it is much more efficient.
+//
+// - in order to keep a good hit ratio with the reconstructed version
+// cache in database, we'll request deltas for a single file/manifest
+// all at once, rather than requesting deltas per-revision. This
+// requires a bit more memory usage, though it will be less memory
+// than would be required to store all the outgoing delta requests
+// anyway.
+//
+// Steps:
+// 1) get new heads, consume valid branch certs etc.
+//
+// 2) foreach new head, traverse up the revision ancestry building
+// a set of mainly reverse deltas, with forward deltas to files or
+// manifests which exist in the head revision.
+//
+// 3) remove any deltas which already exist, and make requests
+// for full files in the case where we don't already have the source
+// for a full delta.
+//
+// 4) For each file/manifest in head, traverse up the set of reverse
+// deltas requesting those deltas.
+//
 ancestry_fetcher::ancestry_fetcher(session & s)
     : sess(s)
 {
-
-  // Get the heads.
   set<revision_id> new_heads;
   sess.get_heads_and_consume_certs( new_heads );
+
   L(F("ancestry_fetcher: got %d heads") % new_heads.size());

-  // for each new head, we traverse up the ancestry until we reach an existing
-  // revision, an already-requested revision, or a root revision.
-  for (set<revision_id>::const_iterator h = new_heads.begin();
-       h != new_heads.end(); h++)
-    {
-      L(F("traverse head %s") % *h);
-      traverse_ancestry(*h);
-    }
+  traverse_ancestry(new_heads);

   request_files();
   request_manifests();
 }

 void
-ancestry_fetcher::traverse_files(change_set const & cset,
-                                 map< file_id, file_id > & fwd_jump_deltas)
+ancestry_fetcher::traverse_files(change_set const & cset)
 {
   for (change_set::delta_map::const_iterator d = cset.deltas.begin();
        d != cset.deltas.end(); ++d)
     {
-
       file_id parent_file (delta_entry_src(d));
       file_id child_file (delta_entry_dst(d));
+      L(F("traverse_files parent %s child %s")
+        % parent_file % child_file);

-      // surely we can assume this
       I(!(parent_file == child_file));
-
-      // when changeset format is altered to have ->"" deltas on deletion,
-      // this needs revisiting.
+      // XXX when changeset format is altered to have [...]->[] deltas on deletion,
+      // this assertion needs revisiting
       I(!null_id(child_file));

-      L(F("traverse_files parent %s child %s")
-        % parent_file % child_file);
+      // request the reverse delta
+      if (!null_id(parent_file))
+        {
+          L(F("inserting file rev_deltas"));
+          rev_file_deltas.insert(make_pair(child_file, parent_file));
+        }

-      // if this is the first time we've seen the child file_id,
-      // add it to the fwd-delta list.
+      // add any new forward deltas
       if (seen_files.find(child_file) == seen_files.end())
         {
-          L(F("unseen file %s, adding to anchor_files") % child_file);
-          anchor_files.insert(child_file);
-          if (!sess.app.db.file_version_exists(child_file))
-            {
-              L(F("inserting fwd_jump_deltas"));
-              fwd_jump_deltas.insert( make_pair( parent_file, child_file ) );
-            }
+          L(F("inserting fwd_jump_deltas"));
+          fwd_file_deltas.insert( make_pair( parent_file, child_file ) );
         }
-      else
+
+      // update any forward deltas. no point updating if it already
+      // points to something we have.
+      if (!null_id(parent_file)
+          && fwd_file_deltas.find(child_file) != fwd_file_deltas.end())
         {
-          // ... and update any fwd_jump_deltas.
-          // no point updating if it already references something we have.
-          if (fwd_jump_deltas.find(child_file) != fwd_jump_deltas.end()
-              && !sess.app.db.file_version_exists(child_file))
+          // We're traversing with child->parent of A->B.
+          // Update any forward deltas with a parent of B to
+          // have A as a parent, ie B->C becomes A->C.
+          for (multimap<file_id,file_id>::iterator d =
+               fwd_file_deltas.lower_bound(child_file);
+               d != fwd_file_deltas.upper_bound(child_file);
+               d++)
             {
-              L(F("updating fwd_jump_deltas for head file %s")
-                % fwd_jump_deltas[child_file]);
-              fwd_jump_deltas[parent_file] = fwd_jump_deltas[child_file];
-              fwd_jump_deltas.erase(child_file);
+              fwd_file_deltas.insert(make_pair(parent_file, d->second));
             }
-        }

-      // request the reverse delta
-      if (!sess.app.db.file_version_exists(parent_file)
-          && !null_id(parent_file))
-        {
-          L(F("inserting file rev_deltas"));
-          rev_file_deltas[child_file].insert(parent_file);
+            fwd_file_deltas.erase(fwd_file_deltas.lower_bound(child_file),
+                                  fwd_file_deltas.upper_bound(child_file));
         }
+
       seen_files.insert(child_file);
       seen_files.insert(parent_file);
     }
@@ -3955,152 +3700,132 @@ ancestry_fetcher::traverse_manifest(mani

 void
 ancestry_fetcher::traverse_manifest(manifest_id const & child_man,
-                                    manifest_id const & parent_man,
-                                    manifest_id & fwd_anc)
+                                    manifest_id const & parent_man)
 {
-
-
   L(F("traverse_manifest parent %s child %s")
     % parent_man % child_man);
+  I(!null_id(child_man));
   // add reverse deltas
-  if (!sess.app.db.manifest_version_exists(parent_man)
-      && !null_id(parent_man))
+  if (!null_id(parent_man))
     {
       L(F("inserting manifest rev_deltas"));
-      rev_manifest_deltas[child_man].insert(parent_man);
+      rev_manifest_deltas.insert(make_pair(child_man, parent_man));
     }

-  // handle the manifest forward-delta for this head
-  if (child_man == fwd_anc
-      && !sess.app.db.manifest_version_exists(child_man))
+  // handle the manifest forward-deltas
+  if (!null_id(parent_man)
+      && fwd_manifest_deltas.find(child_man) != fwd_manifest_deltas.end())
     {
-      L(F("set connected_manifest to %s (was %s)")
-        % parent_man % fwd_anc);
-      fwd_anc = parent_man;
-    }
+      // We're traversing with child->parent of A->B.
+      // Update any forward deltas with a parent of B to
+      // have A as a parent, ie B->C becomes A->C.
+      L(F("inserting manifest rev_deltas"));
+      for (multimap<manifest_id,manifest_id>::iterator d =
+           fwd_manifest_deltas.lower_bound(child_man);
+           d != fwd_manifest_deltas.upper_bound(child_man);
+           d++)
+        {
+          L(F("here %s,%s") % d->first % d->second);
+        }
+      for (multimap<manifest_id,manifest_id>::iterator d =
+           fwd_manifest_deltas.lower_bound(child_man);
+           d != fwd_manifest_deltas.upper_bound(child_man);
+           d++)
+        {
+          L(F("size %d\n") % fwd_manifest_deltas.size());
+          L(F("inserting %s->%s") % parent_man % d->second);
+          fwd_manifest_deltas.insert(make_pair(parent_man, d->second));
+          L(F("erasing %s,%s") % d->first % d->second);
+        }

+      fwd_manifest_deltas.erase(fwd_manifest_deltas.lower_bound(child_man),
+                                fwd_manifest_deltas.upper_bound(child_man));
+    }
 }

 void
-ancestry_fetcher::traverse_ancestry(revision_id const & head)
+ancestry_fetcher::traverse_ancestry(set<revision_id> const & heads)
 {
-  I(seen_revs.find(head) == seen_revs.end());
-  I(sess.ancestry.find(head) != sess.ancestry.end());
-
   deque<revision_id> frontier;
-  frontier.push_back(head);
+  set<revision_id> seen_revs;

-  // this map will get built up so that upon traversing to a known revision,
-  // we get pairs of ( existing_file, head_file ). along the way, it will
-  // contain pairs ( ancestor_file, head_file ), updated as the changeset
-  // is traversed.
-  map< file_id, file_id > fwd_jump_deltas;
-  // similarly for the manifest, we'll request it entirely
-  manifest_id m = sess.ancestry[head]->second.new_manifest;
-  manifest_id head_manifest;
-  if (!sess.app.db.manifest_version_exists(m))
+  for (set<revision_id>::const_iterator h = heads.begin();
+       h != heads.end(); h++)
     {
-      head_manifest = m;
+      L(F("inserting head %s") % *h);
+      frontier.push_back(*h);
+      seen_revs.insert(*h);
+      manifest_id const & m = sess.ancestry[*h]->second.new_manifest;
+      fwd_manifest_deltas.insert(make_pair(m,m));
     }
-  L(F("head_manifest %s, new_manifest %s") % head_manifest % m);
-  manifest_id connected_manifest = head_manifest;

   // breadth first up the ancestry
   while (!frontier.empty())
     {
-      revision_id rev = frontier.front();
-      L(F("ancestry frontier front %s") % rev);
+      revision_id const & rev = frontier.front();
+
+      L(F("frontier %s") % rev);
       I(sess.ancestry.find(rev) != sess.ancestry.end());

-      frontier.pop_front();
-
       for (edge_map::const_iterator e = sess.ancestry[rev]->second.edges.begin();
            e != sess.ancestry[rev]->second.edges.end(); e++)
         {
-          revision_id const & par = e->first;
+          revision_id const & par = edge_old_revision(e);
           if (seen_revs.find(par) == seen_revs.end())
             {
               if (sess.ancestry.find(par) != sess.ancestry.end())
                 {
-                  L(F("ancestry frontier push_back %s") % par);
+                  L(F("push_back to frontier %s") % par);
                   frontier.push_back(par);
                 }
-              else
-                {
-                  L(F("ancestry frontier not adding %s, isn't in ancestry") % par);
-                }
               seen_revs.insert(par);
             }

-          traverse_manifest(sess.ancestry[rev]->second.new_manifest, edge_old_manifest(e),
-                            connected_manifest);
-          traverse_files(edge_changes(e), fwd_jump_deltas);
+          traverse_manifest(sess.ancestry[rev]->second.new_manifest,
+                            edge_old_manifest(e));
+          traverse_files(edge_changes(e));

-          sess.dbw.consume_revision_data(rev, sess.ancestry[rev]->first);
         }
-    }

-  for (map<file_id,file_id>::const_iterator i = fwd_jump_deltas.begin();
-       i != fwd_jump_deltas.end(); i++)
-    {
-      if (null_id(i->first))
-        {
-          L(F("requesting full head file %s") % i->second);
-          full_files.insert(i->second);
-        }
-      else
-        {
-          fwd_file_deltas[i->first] = i->second;
-        }
+      sess.dbw.consume_revision_data(rev, sess.ancestry[rev]->first);
+      frontier.pop_front();
     }
-
-  if (!null_id(head_manifest))
-    {
-      if (null_id(connected_manifest))
-        {
-          L(F("requesting full manifest %s") % head_manifest);
-          full_manifests.insert(head_manifest);
-        }
-      else
-        {
-          L(F("requesting manifest fwd delta %s -> %s")
-            % connected_manifest % head_manifest);
-          fwd_manifest_deltas[connected_manifest] = head_manifest;
-        }
-    }
 }

 void
 ancestry_fetcher::request_rev_file_deltas(file_id const & start)
 {
-  vector< pair<file_id,file_id> > frontier;
-  frontier.push_back(make_pair(file_id(), start));
+  stack< file_id > frontier;
+  frontier.push(start);

-  L(F("request_rev_file_deltas: start %s, %d rev_file_deltas")
-    % start % rev_file_deltas.size());
-  // depth first
   while (!frontier.empty())
     {
-      file_id child = frontier.back().first;
-      file_id parent = frontier.back().second;
+      file_id const child = frontier.top();
+      I(!null_id(child));
+      frontier.pop();

-      // request it
-      if (!null_id(child))
+      for (multimap< file_id, file_id>::const_iterator
+           d = rev_file_deltas.lower_bound(child);
+           d != rev_file_deltas.upper_bound(child);
+           d++)
         {
-          L(F("requesting rev file delta child %s -> parent %s")
-            % child % parent);
-          sess.queue_send_delta_cmd(file_item,
-                               plain_id(child), plain_id(parent));
-        }
-      frontier.pop_back();
-
-      if (rev_file_deltas.find(parent) != rev_file_deltas.end())
-        {
-          for (set<file_id>::const_iterator f = rev_file_deltas[parent].begin();
-               f != rev_file_deltas[parent].end(); f++)
+          file_id const & parent = d->second;
+          I(!null_id(parent));
+          if (!sess.app.db.file_version_exists(parent))
             {
-              frontier.push_back( make_pair(parent, *f) );
+              L(F("requesting reverse file delta %s->%s")
+                % child % parent);
+              sess.queue_send_delta_cmd(file_item,
+                                        plain_id(child), plain_id(parent));
+              sess.reverse_delta_requests.insert(make_pair(plain_id(child),
+                                                           plain_id(parent)));
             }
+          else
+            {
+              L(F("already have file %s, not requesting rev delta")
+                % parent);
+            }
+          frontier.push(parent);
         }
     }
 }
@@ -4108,87 +3833,73 @@ ancestry_fetcher::request_files()
 void
 ancestry_fetcher::request_files()
 {
-
-  L(F("request_files: %d full, %d fwd_deltas")
-    % full_files.size() % fwd_file_deltas.size());
-
-  L(F("rev files deltas are:"));
-  for (map<file_id,set<file_id> >::const_iterator d = rev_file_deltas.begin();
-       d != rev_file_deltas.end(); d++)
+  for (multimap<file_id,file_id>::const_iterator d = fwd_file_deltas.begin();
+       d != fwd_file_deltas.end(); d++)
     {
-      L(F("child %s") % d->first);
-      for (set<file_id>::const_iterator p = d->second.begin();
-           p != d->second.end(); p++)
+      file_id const & anc = d->first;
+      file_id const & child = d->second;
+      if (!sess.app.db.file_version_exists(child))
         {
-          L(F("\tpar %s") % *p);
+          if (null_id(anc)
+              || !sess.app.db.file_version_exists(anc))
+            {
+              L(F("requesting full file %s") % child);
+              sess.queue_send_data_cmd(file_item, plain_id(child));
+            }
+          else
+            {
+              L(F("requesting forward delta %s->%s")
+                % anc % child);
+              sess.queue_send_delta_cmd(file_item,
+                                        plain_id(anc), plain_id(child));
+              sess.note_item_full_delta(file_item, plain_id(child));
+            }
         }
-    }
-
-  // we start with the full files, and work
-  // our way back.
-  for (set<file_id>::const_iterator f = full_files.begin();
-       f != full_files.end(); f++)
-    {
-      L(F("requesting full file data %s") % *f);
-      sess.queue_send_data_cmd(file_item, plain_id(*f));
-      request_rev_file_deltas(*f);
-      anchor_files.erase(*f);
-    }
-  full_files.clear();
+      else
+        {
+          L(F("not requesting fwd delta %s->%s, already have dst")
+            % anc % child);
+        }

-  // and now the full files we'll get from fwd deltas
-  for (map<file_id,file_id>::const_iterator d = fwd_file_deltas.begin();
-       d != fwd_file_deltas.end(); d++)
-    {
-      L(F("requesting fwd file delta %s->%s") % d->first % d->second);
-      sess.queue_send_delta_cmd(file_item, plain_id(d->first), plain_id(d->second));
-      sess.note_item_full_delta(file_item, plain_id(d->second));
-      request_rev_file_deltas(d->second);
-      anchor_files.erase(d->second);
+      // traverse up the reverse deltas
+      request_rev_file_deltas(child);
     }
-
-  // and finally all the remaining anchor files
-  for (set<file_id>::const_iterator f = anchor_files.begin();
-       f != anchor_files.end(); f++)
-    {
-      L(F("rev deltas from anchor file %s") % *f);
-      request_rev_file_deltas(*f);
-    }
-  anchor_files.clear();
 }

 void
 ancestry_fetcher::request_rev_manifest_deltas(manifest_id const & start)
 {
-  vector< pair<manifest_id,manifest_id> > frontier;
-  frontier.push_back(make_pair(manifest_id(), start));
+  stack< manifest_id > frontier;
+  frontier.push(start);

-  L(F("request_rev_manifest_deltas: start %s, %d rev_manifest_deltas")
-    % start % rev_manifest_deltas.size());
-
-  // depth first
   while (!frontier.empty())
     {
-      manifest_id child = frontier.back().first;
-      manifest_id parent = frontier.back().second;
+      manifest_id const child = frontier.top();
+      I(!null_id(child));
+      frontier.pop();

-      // request it
-      if (!null_id(child))
+      for (multimap< manifest_id, manifest_id>::const_iterator
+           d = rev_manifest_deltas.lower_bound(child);
+           d != rev_manifest_deltas.upper_bound(child);
+           d++)
         {
-          L(F("requesting rev manifest delta child %s -> parent %s")
-            % child % parent);
-          sess.queue_send_delta_cmd(manifest_item,
-                                    plain_id(child), plain_id(parent));
-        }
-      frontier.pop_back();
-
-      if (rev_manifest_deltas.find(parent) != rev_manifest_deltas.end())
-        {
-          for (set<manifest_id>::const_iterator m = rev_manifest_deltas[parent].begin();
-               m != rev_manifest_deltas[parent].end(); m++)
+          manifest_id const & parent = d->second;
+          I(!null_id(parent));
+          if (!sess.app.db.manifest_version_exists(parent))
             {
-              frontier.push_back( make_pair(parent, *m) );
+              L(F("requesting reverse manifest delta %s->%s")
+                % child % parent);
+              sess.queue_send_delta_cmd(manifest_item,
+                                        plain_id(child), plain_id(parent));
+              sess.reverse_delta_requests.insert(make_pair(plain_id(child),
+                                                           plain_id(parent)));
             }
+          else
+            {
+              L(F("already have manifest %s, not requesting rev delta")
+                % parent);
+            }
+          frontier.push(parent);
         }
     }
 }
@@ -4196,28 +3907,35 @@ ancestry_fetcher::request_manifests()
 void
 ancestry_fetcher::request_manifests()
 {
-  L(F("request_manifests: %d full, %d fwd_deltas")
-    % full_manifests.size() % fwd_manifest_deltas.size());
-
-  // we start with the full manifests, and work
-  // our way back.
-  for (set<manifest_id>::const_iterator f = full_manifests.begin();
-       f != full_manifests.end(); f++)
+  for (multimap<manifest_id,manifest_id>::const_iterator d = fwd_manifest_deltas.begin();
+       d != fwd_manifest_deltas.end(); d++)
     {
-      L(F("requesting full manifest data %s") % *f);
-      sess.queue_send_data_cmd(manifest_item, plain_id(*f));
-      request_rev_manifest_deltas(*f);
-    }
-  full_manifests.clear();
+      manifest_id const & anc = d->first;
+      manifest_id const & child = d->second;
+      if (!sess.app.db.manifest_version_exists(child))
+        {
+          if (null_id(anc)
+              || !sess.app.db.manifest_version_exists(anc))
+            {
+              L(F("requesting full manifest %s") % child);
+              sess.queue_send_data_cmd(manifest_item, plain_id(child));
+            }
+          else
+            {
+              L(F("requesting forward delta %s->%s")
+                % anc % child);
+              sess.queue_send_delta_cmd(manifest_item,
+                                        plain_id(anc), plain_id(child));
+              sess.note_item_full_delta(manifest_item, plain_id(child));
+            }
+        }
+      else
+        {
+          L(F("not requesting fwd delta %s->%s, already have dst")
+            % anc % child);
+        }

-  // and now the full manifests we'll get from fwd deltas
-  for (map<manifest_id,manifest_id>::const_iterator d = fwd_manifest_deltas.begin();
-       d != fwd_manifest_deltas.end(); d++)
-    {
-      L(F("requesting fwd manifest delta %s->%s") % d->first % d->second);
-      sess.queue_send_delta_cmd(manifest_item, plain_id(d->first), plain_id(d->second));
-      sess.note_item_full_delta(manifest_item, plain_id(d->second));
-      request_rev_manifest_deltas(d->second);
+      // traverse up the reverse deltas
+      request_rev_manifest_deltas(child);
     }
 }
-
============================================================
--- packet.cc	407e15cf1c485e09665a372588be2f99491a17d7
+++ packet.cc	0867d1963018a7de23a7ca07be1e637817aaeb50
@@ -246,12 +246,14 @@ delayed_file_delta_packet
   file_id new_id;
   file_delta del;
   bool forward_delta;
+  bool write_full;
 public:
   delayed_file_delta_packet(file_id const & oi,
                             file_id const & ni,
                             file_delta const & md,
-                            bool fwd)
-    : old_id(oi), new_id(ni), del(md), forward_delta(fwd)
+                            bool fwd,
+                            bool full = false)
+    : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full)
   {}
   virtual void apply_delayed_packet(packet_db_writer & pw);
   virtual ~delayed_file_delta_packet();
@@ -265,12 +267,14 @@ delayed_manifest_delta_packet
   manifest_id new_id;
   manifest_delta del;
   bool forward_delta;
+  bool write_full;
 public:
   delayed_manifest_delta_packet(manifest_id const & oi,
                                 manifest_id const & ni,
                                 manifest_delta const & md,
-                                bool fwd)
-    : old_id(oi), new_id(ni), del(md), forward_delta(fwd)
+                                bool fwd,
+                                bool full = false)
+    : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full)
   {}
   virtual void apply_delayed_packet(packet_db_writer & pw);
   virtual ~delayed_manifest_delta_packet();
@@ -361,12 +365,13 @@ delayed_manifest_delta_packet::apply_del
 void
 delayed_manifest_delta_packet::apply_delayed_packet(packet_db_writer & pw)
 {
-  L(F("writing delayed manifest %s packet for %s -> %s\n")
+  L(F("writing delayed manifest %s packet for %s -> %s%s\n")
     % (forward_delta ? "delta" : "reverse delta")
     % (forward_delta ? old_id : new_id)
-    % (forward_delta ? new_id : old_id));
+    % (forward_delta ? new_id : old_id)
+    % (write_full ? " (writing in full)" : ""));
   if (forward_delta)
-    pw.consume_manifest_delta(old_id, new_id, del);
+      pw.consume_manifest_delta(old_id, new_id, del, write_full);
   else
     pw.consume_manifest_reverse_delta(new_id, old_id, del);
 }
@@ -381,12 +386,13 @@ delayed_file_delta_packet::apply_delayed
 void
 delayed_file_delta_packet::apply_delayed_packet(packet_db_writer & pw)
 {
-  L(F("writing delayed file %s packet for %s -> %s\n")
+  L(F("writing delayed file %s packet for %s -> %s%s\n")
     % (forward_delta ? "delta" : "reverse delta")
     % (forward_delta ? old_id : new_id)
-    % (forward_delta ? new_id : old_id));
+    % (forward_delta ? new_id : old_id)
+    % (write_full ? " (writing in full)" : ""));
   if (forward_delta)
-    pw.consume_file_delta(old_id, new_id, del);
+    pw.consume_file_delta(old_id, new_id, del, write_full);
   else
     pw.consume_file_reverse_delta(new_id, old_id, del);
 }
@@ -656,6 +662,15 @@ packet_db_writer::consume_file_delta(fil
                                      file_id const & new_id,
                                      file_delta const & del)
 {
+  consume_file_delta(old_id, new_id, del, false);
+}
+
+void
+packet_db_writer::consume_file_delta(file_id const & old_id,
+                                     file_id const & new_id,
+                                     file_delta const & del,
+                                     bool write_full)
+{
   transaction_guard guard(pimpl->app.db);
   if (! pimpl->file_version_exists_in_db(new_id))
     {
@@ -669,7 +684,10 @@ packet_db_writer::consume_file_delta(fil
           calculate_ident(file_data(new_dat), confirm);
           if (confirm == new_id)
             {
-              pimpl->app.db.put_file_version(old_id, new_id, del);
+              if (!write_full)
+                pimpl->app.db.put_file_version(old_id, new_id, del);
+              else
+                pimpl->app.db.put_file(new_id, file_data(new_dat));
               pimpl->accepted_file(new_id, *this);
             }
           else
@@ -682,7 +700,7 @@ packet_db_writer::consume_file_delta(fil
         {
           L(F("delaying file delta %s -> %s for preimage\n") % old_id % new_id);
           shared_ptr<delayed_packet> dp;
-          dp = shared_ptr<delayed_packet>(new delayed_file_delta_packet(old_id, new_id, del, true));
+          dp = shared_ptr<delayed_packet>(new delayed_file_delta_packet(old_id, new_id, del, true, write_full));
           shared_ptr<prerequisite> fp;
           pimpl->get_file_prereq(old_id, fp);
           dp->add_prerequisite(fp);
@@ -761,6 +779,15 @@ packet_db_writer::consume_manifest_delta
                                          manifest_id const & new_id,
                                          manifest_delta const & del)
 {
+  consume_manifest_delta(old_id, new_id, del, false);
+}
+
+void
+packet_db_writer::consume_manifest_delta(manifest_id const & old_id,
+                                         manifest_id const & new_id,
+                                         manifest_delta const & del,
+                                         bool write_full)
+{
   transaction_guard guard(pimpl->app.db);
   if (! pimpl->manifest_version_exists_in_db(new_id))
     {
@@ -774,7 +801,11 @@ packet_db_writer::consume_manifest_delta
           calculate_ident(manifest_data(new_dat), confirm);
           if (confirm == new_id)
             {
-              pimpl->app.db.put_manifest_version(old_id, new_id, del);
+              if (!write_full)
+                pimpl->app.db.put_manifest_version(old_id, new_id, del);
+              else
+                pimpl->app.db.put_manifest(new_id, manifest_data(new_dat));
+
               pimpl->accepted_manifest(new_id, *this);
             }
           else
@@ -787,7 +818,7 @@ packet_db_writer::consume_manifest_delta
         {
           L(F("delaying manifest delta %s -> %s for preimage\n") % old_id % new_id);
           shared_ptr<delayed_packet> dp;
-          dp = shared_ptr<delayed_packet>(new delayed_manifest_delta_packet(old_id, new_id, del, true));
+          dp = shared_ptr<delayed_packet>(new delayed_manifest_delta_packet(old_id, new_id, del, true, write_full));
           shared_ptr<prerequisite> fp;
           pimpl->get_manifest_prereq(old_id, fp);
           dp->add_prerequisite(fp);
@@ -1108,6 +1139,15 @@ void
 }

 void
+packet_db_valve::consume_file_delta(file_id const & id_old,
+                                    file_id const & id_new,
+                                    file_delta const & del,
+                                    bool write_full)
+{
+  DOIT(delayed_file_delta_packet(id_old, id_new, del, true, write_full));
+}
+
+void
 packet_db_valve::consume_file_reverse_delta(file_id const & id_new,
                                             file_id const & id_old,
                                             file_delta const & del)
@@ -1131,6 +1171,15 @@ void
 }

 void
+packet_db_valve::consume_manifest_delta(manifest_id const & id_old,
+                                        manifest_id const & id_new,
+                                        manifest_delta const & del,
+                                        bool write_full)
+{
+  DOIT(delayed_manifest_delta_packet(id_old, id_new, del, true, write_full));
+}
+
+void
 packet_db_valve::consume_manifest_reverse_delta(manifest_id const & id_new,
                                                 manifest_id const & id_old,
                                                 manifest_delta const & del)
============================================================
--- packet.hh	ec7178c2332473305c0aa7d00c727e338fc7810d
+++ packet.hh	c2d6f03ffa901f98ad6623090b687982d27db552
@@ -136,6 +136,10 @@ public:
   virtual void consume_file_delta(file_id const & id_old,
                                   file_id const & id_new,
                                   file_delta const & del);
+  virtual void consume_file_delta(file_id const & id_old,
+                                  file_id const & id_new,
+                                  file_delta const & del,
+                                  bool write_full);
   virtual void consume_file_reverse_delta(file_id const & id_new,
                                           file_id const & id_old,
                                           file_delta const & del);
@@ -145,6 +149,10 @@ public:
   virtual void consume_manifest_delta(manifest_id const & id_old,
                                       manifest_id const & id_new,
                                       manifest_delta const & del);
+  virtual void consume_manifest_delta(manifest_id const & id_old,
+                                      manifest_id const & id_new,
+                                      manifest_delta const & del,
+                                      bool write_full);
   virtual void consume_manifest_reverse_delta(manifest_id const & id_new,
                                               manifest_id const & id_old,
                                               manifest_delta const & del);
@@ -185,6 +193,10 @@ public:
   virtual void consume_file_delta(file_id const & id_old,
                                   file_id const & id_new,
                                   file_delta const & del);
+  virtual void consume_file_delta(file_id const & id_old,
+                                  file_id const & id_new,
+                                  file_delta const & del,
+                                  bool write_full);
   virtual void consume_file_reverse_delta(file_id const & id_new,
                                           file_id const & id_old,
                                           file_delta const & del);
@@ -194,6 +206,10 @@ public:
   virtual void consume_manifest_delta(manifest_id const & id_old,
                                       manifest_id const & id_new,
                                       manifest_delta const & del);
+  virtual void consume_manifest_delta(manifest_id const & id_old,
+                                      manifest_id const & id_new,
+                                      manifest_delta const & del,
+                                      bool write_full);
   virtual void consume_manifest_reverse_delta(manifest_id const & id_new,
                                               manifest_id const & id_old,
                                               manifest_delta const & del);