Below is the file 'enumerator.cc' from this revision. You can also download the file.

// copyright (C) 2005 graydon hoare <graydon@pobox.com>
// all rights reserved.
// licensed to the public under the terms of the GNU GPL (>= 2)
// see the file COPYING for details

#include <deque>
#include <map>
#include <set>
#include <vector>

#include "cset.hh"
#include "enumerator.hh"
#include "revision.hh"
#include "vocab.hh"

using std::deque;
using std::make_pair;
using std::map;
using std::multimap;
using std::pair;
using std::set;
using std::vector;

revision_enumerator::revision_enumerator(enumerator_callbacks & cb,
                                         app_state & app,
                                         set<revision_id> const & initial,
                                         set<revision_id> const & terminal)
  : cb(cb), app(app), terminal_nodes(terminal)
{
  // TODO: needs sorting out with the toposort stuff.
  I(false);
  /*
  for (set<revision_id>::const_iterator i = initial.begin();
       i != initial.end(); ++i)
    revs.push_back(*i);
    */
  load_revs();
}

revision_enumerator::revision_enumerator(enumerator_callbacks & cb,
                                         app_state & app)
  : cb(cb), app(app)
{
  revision_id root;
  load_revs();
}

void
revision_enumerator::load_revs()
{
  // TODO: this totally disrespects the initial and terminal stuff.
  // It should probably be integrated into the toposort code itself.
  vector<revision_id> topo_vec;
  toposort(topo_vec, app.db);
  for (vector<revision_id>::const_iterator r = topo_vec.begin();
       r != topo_vec.end(); r++)
    {
      if (null_id(*r))
        continue;
      topo_revs.push_back(*r);
    }
}

bool
revision_enumerator::done()
{
  return topo_revs.empty() && items.empty();
}

void
revision_enumerator::process_bunch()
{
  // we build up a set of files and revs to send
  vector<revision_id> bunch_revs;
  set<file_id> bunch_files;
  multimap<file_id, file_id> bunch_file_deltas;

  // files that should be sent first
  set<file_id> top_files;
  // files that will be sent either as full files are as the dst of deltas
  set<file_id> dst_files;

  while (bunch_revs.size() < 100 && !topo_revs.empty())
    {
      revision_id r = topo_revs.front();
      topo_revs.pop_front();
      if (!cb.process_this_rev(r))
        continue;

      bunch_revs.push_back(r);

      revision_set rs;
      app.db.get_revision(r, rs);
      for (edge_map::const_iterator i = rs.edges.begin();
           i != rs.edges.end(); ++i)
        {
          cset const & cs = edge_changes(i);

          // Queue up all the file-adds
          for (map<split_path, file_id>::const_iterator fa = cs.files_added.begin();
               fa != cs.files_added.end(); ++fa)
            {
              if (cb.queue_this_file(fa->second.inner()))
                {
                  dst_files.insert(fa->second);
                  top_files.insert(fa->second);
                  bunch_files.insert(fa->second);
                }
            }

          // Queue up all the file-deltas
          for (map<split_path, std::pair<file_id, file_id> >::const_iterator fd
                 = cs.deltas_applied.begin();
               fd != cs.deltas_applied.end(); ++fd)
            {
              if (cb.queue_this_file(fd->second.second.inner()))
                {
                  if (dst_files.find(fd->second.first) == dst_files.end())
                    {
                      top_files.insert(fd->second.first);
                    }

                  bunch_file_deltas.insert(make_pair(fd->second.first,
                                                     fd->second.second));
                  dst_files.insert(fd->second.second);
                }
            }
        }
    }

  // XXX required?
  set<file_id> sent_files;

  // now we can queue up the file items in order.
  for (set<file_id>::const_iterator t = top_files.begin();
       t != top_files.end(); t++)
    {
      if (sent_files.find(*t) != sent_files.end())
        {
          L(FL("already sent top_file %s") % t->inner()());
          continue;
        }

      L(FL("top_file %s") % t->inner()());

      if (bunch_files.find(*t) != bunch_files.end())
        {
          // a full file to send.
          enumerator_item item;
          item.tag = enumerator_item::fdata;
          item.ident_a = t->inner();
          items.push_back(item);
          sent_files.insert(*t);
          L(FL("send full_file %s") % t->inner()());
        }

      // XXX: doing BFS now, should try DFS too.
      deque<file_id> frontier;
      frontier.push_back(*t);

      while (!frontier.empty())
        {
          file_id f = frontier.front();
          frontier.pop_front();

          L(FL("frontier %s") % f.inner()());

          for (multimap<file_id,file_id>::const_iterator
               d = bunch_file_deltas.lower_bound(f);
               d != bunch_file_deltas.upper_bound(f);
               d++)
            {
              if (sent_files.find(d->second) != sent_files.end())
                {
                  L(FL("already sent delta %s") % d->second.inner()());
                  continue;
                }
              sent_files.insert(d->second);
              frontier.push_back(d->second);

              enumerator_item item;
              item.tag = enumerator_item::fdelta;
              item.ident_a = d->first.inner();
              item.ident_b = d->second.inner();
              items.push_back(item);
              L(FL("file_delta %s->%s") % d->first.inner()() % d->second.inner()());
            }
        }
    }

  // and the revs
  for (vector<revision_id>::const_iterator r = bunch_revs.begin();
       r != bunch_revs.end(); r++)
    {
      enumerator_item item;
      item.tag = enumerator_item::rev;
      item.ident_a = r->inner();
      items.push_back(item);

      // Queue up some or all of the rev's certs
      vector<hexenc<id> > hashes;
      app.db.get_revision_certs(*r, hashes);
      for (vector<hexenc<id> >::const_iterator i = hashes.begin();
           i != hashes.end(); ++i)
        {
          if (cb.queue_this_cert(*i))
            {
              enumerator_item item;
              item.tag = enumerator_item::cert;
              item.ident_a = *i;
              items.push_back(item);
            }
        }
    }
}

void
revision_enumerator::step()
{
  while (!done())
    {
      if (items.empty() && !topo_revs.empty())
        {
          process_bunch();
        }

      if (!items.empty())
        {
          L(FL("revision_enumerator::step extracting item\n"));

          enumerator_item i = items.front();
          items.pop_front();
          I(!null_id(i.ident_a));

          switch (i.tag)
            {
            case enumerator_item::fdata:
              cb.note_file_data(file_id(i.ident_a));
              break;

            case enumerator_item::fdelta:
              I(!null_id(i.ident_b));
              cb.note_file_delta(file_id(i.ident_a),
                                 file_id(i.ident_b));
              break;

            case enumerator_item::rev:
              cb.note_rev(revision_id(i.ident_a));
              break;

            case enumerator_item::cert:
              cb.note_cert(i.ident_a);
              break;
            }
          break;
        }
    }
}