Below is the file 'enumerator.cc' from this revision. You can also download the file.
// copyright (C) 2005 graydon hoare <graydon@pobox.com> // all rights reserved. // licensed to the public under the terms of the GNU GPL (>= 2) // see the file COPYING for details #include <deque> #include <map> #include <set> #include <vector> #include "cset.hh" #include "enumerator.hh" #include "revision.hh" #include "vocab.hh" using std::deque; using std::make_pair; using std::map; using std::multimap; using std::pair; using std::set; using std::vector; revision_enumerator::revision_enumerator(enumerator_callbacks & cb, app_state & app, set<revision_id> const & initial, set<revision_id> const & terminal) : cb(cb), app(app), terminal_nodes(terminal) { // TODO: needs sorting out with the toposort stuff. I(false); /* for (set<revision_id>::const_iterator i = initial.begin(); i != initial.end(); ++i) revs.push_back(*i); */ load_revs(); } revision_enumerator::revision_enumerator(enumerator_callbacks & cb, app_state & app) : cb(cb), app(app) { revision_id root; load_revs(); } void revision_enumerator::load_revs() { // TODO: this totally disrespects the initial and terminal stuff. // It should probably be integrated into the toposort code itself. vector<revision_id> topo_vec; toposort(topo_vec, app.db); for (vector<revision_id>::const_iterator r = topo_vec.begin(); r != topo_vec.end(); r++) { if (null_id(*r)) continue; topo_revs.push_back(*r); } } bool revision_enumerator::done() { return topo_revs.empty() && items.empty(); } void revision_enumerator::files_for_revision(revision_id const & r, set<file_id> & full_files, set<pair<file_id,file_id> > & del_files) { // when we're sending a merge, we have to be careful if we // want to send as little data as possible. see bug #15846 // // njs's solution: "when sending the files for a revision, // look at both csets. If a given hash is not listed as new // in _both_ csets, throw it out. Now, for everything left // over, if one side says "add" and the other says "delta", // do a delta. If both sides say "add", do a data." set<file_id> file_adds; // map<dst, src>. src is arbitrary if there are multiples. map<file_id, file_id> file_deltas; map<file_id, size_t> file_edge_counts; revision_set rs; MM(rs); app.db.get_revision(r, rs); for (edge_map::const_iterator i = rs.edges.begin(); i != rs.edges.end(); ++i) { set<file_id> file_dsts; cset const & cs = edge_changes(i); // Queue up all the file-adds for (map<split_path, file_id>::const_iterator fa = cs.files_added.begin(); fa != cs.files_added.end(); ++fa) { file_adds.insert(fa->second); file_dsts.insert(fa->second); } // Queue up all the file-deltas for (map<split_path, std::pair<file_id, file_id> >::const_iterator fd = cs.deltas_applied.begin(); fd != cs.deltas_applied.end(); ++fd) { file_deltas[fd->second.second] = fd->second.first; file_dsts.insert(fd->second.second); } // we don't want to be counting files twice in a single edge for (set<file_id>::const_iterator i = file_dsts.begin(); i != file_dsts.end(); i++) file_edge_counts[*i]++; } del_files.clear(); full_files.clear(); size_t num_edges = rs.edges.size(); for (map<file_id, size_t>::const_iterator i = file_edge_counts.begin(); i != file_edge_counts.end(); i++) { MM(i->first); L(FL("edge %s count %d") % i->first.inner()() % i->second); if (i->second < num_edges) continue; // first preference is to send as a delta... map<file_id, file_id>::const_iterator fd = file_deltas.find(i->first); if (fd != file_deltas.end()) { del_files.insert(make_pair(fd->second, fd->first)); continue; } // ... otherwise as a full file. set<file_id>::const_iterator f = file_adds.find(i->first); if (f != file_adds.end()) { full_files.insert(*f); continue; } I(false); } } void revision_enumerator::process_bunch() { // we build up a set of files and revs to send vector<revision_id> bunch_revs; set<file_id> bunch_files; multimap<file_id, file_id> bunch_file_deltas; // files that should be sent first set<file_id> top_files; // files that will be sent either as full files are as the dst of deltas set<file_id> dst_files; while (bunch_revs.size() < 100 && !topo_revs.empty()) { revision_id r = topo_revs.front(); topo_revs.pop_front(); if (!cb.process_this_rev(r)) continue; bunch_revs.push_back(r); set<file_id> full_files; set<pair<file_id, file_id> > del_files; files_for_revision(r, full_files, del_files); for (set<file_id>::const_iterator f = full_files.begin(); f != full_files.end(); f++) { L(FL("full_file %s") % f->inner()()); bunch_files.insert(*f); top_files.insert(*f); dst_files.insert(*f); } for (set<pair<file_id,file_id> >::const_iterator fd = del_files.begin(); fd != del_files.end(); fd++) { L(FL("del_file %s->%s") % fd->first.inner()() % fd->second.inner()()); file_id src(fd->first); file_id dst(fd->second); bunch_file_deltas.insert(make_pair(fd->first, fd->second)); if (dst_files.find(src) == dst_files.end()) { L(FL("added del_file to top_files")); top_files.insert(src); } dst_files.insert(dst); } } // XXX required? set<file_id> sent_files; // now we can queue up the file items in order. for (set<file_id>::const_iterator t = top_files.begin(); t != top_files.end(); t++) { if (sent_files.find(*t) != sent_files.end()) { L(FL("already sent top_file %s") % t->inner()()); continue; } L(FL("top_file %s") % t->inner()()); if (bunch_files.find(*t) != bunch_files.end()) { // a full file to send. enumerator_item item; item.tag = enumerator_item::fdata; item.ident_a = t->inner(); items.push_back(item); sent_files.insert(*t); L(FL("send full_file %s") % t->inner()()); } // XXX: doing BFS now, should try DFS too. deque<file_id> frontier; frontier.push_back(*t); while (!frontier.empty()) { file_id f = frontier.front(); frontier.pop_front(); L(FL("frontier %s") % f.inner()()); for (multimap<file_id,file_id>::const_iterator d = bunch_file_deltas.lower_bound(f); d != bunch_file_deltas.upper_bound(f); d++) { if (sent_files.find(d->second) != sent_files.end()) { L(FL("already sent delta %s") % d->second.inner()()); continue; } sent_files.insert(d->second); frontier.push_back(d->second); enumerator_item item; item.tag = enumerator_item::fdelta; item.ident_a = d->first.inner(); item.ident_b = d->second.inner(); items.push_back(item); L(FL("file_delta %s->%s") % d->first.inner()() % d->second.inner()()); } } } // and the revs for (vector<revision_id>::const_iterator r = bunch_revs.begin(); r != bunch_revs.end(); r++) { enumerator_item item; item.tag = enumerator_item::rev; item.ident_a = r->inner(); items.push_back(item); // Queue up some or all of the rev's certs vector<hexenc<id> > hashes; app.db.get_revision_certs(*r, hashes); for (vector<hexenc<id> >::const_iterator i = hashes.begin(); i != hashes.end(); ++i) { if (cb.queue_this_cert(*i)) { enumerator_item item; item.tag = enumerator_item::cert; item.ident_a = *i; items.push_back(item); } } } } void revision_enumerator::step() { while (!done()) { if (items.empty() && !topo_revs.empty()) { process_bunch(); } if (!items.empty()) { L(FL("revision_enumerator::step extracting item\n")); enumerator_item i = items.front(); items.pop_front(); I(!null_id(i.ident_a)); switch (i.tag) { case enumerator_item::fdata: cb.note_file_data(file_id(i.ident_a)); break; case enumerator_item::fdelta: I(!null_id(i.ident_b)); cb.note_file_delta(file_id(i.ident_a), file_id(i.ident_b)); break; case enumerator_item::rev: cb.note_rev(revision_id(i.ident_a)); break; case enumerator_item::cert: cb.note_cert(i.ident_a); break; } break; } } }