Below is the file 'contrib/usher.cc' from this revision. You can also download the file.

// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*-
// Timothy Brownawell  <tbrownaw@gmail.com>
// GPL v2
//
// This is an "usher" to allow multiple monotone servers to work from
// the same port. It asks the client what it wants to sync,
// and then looks up the matching server in a table. It then forwards
// the connection to that server. All servers using the same usher need
// to have the same server key.
//
// This requires cooperation from the client, which means it only works
// for recent (0.23 or later) clients. In order to match against hostnames
// a post-0.23 client is needed (0.23 clients can only be matched against
// their include pattern).
//
// Usage: usher [-l address[:port]] [-a address:port] [-p pidfile] <server-file>
//
// options:
// -m   the monotone command, defaults to "monotone"
// -l   address and port to listen on, defaults to 0.0.0.0:4691
// -a   address and port to listen for admin commands
// -p   a file (deleted on program exit) to record the pid of the usher in
// <server-file>   a file that looks like
//   userpass username password
//
//   server monotone
//   host localhost
//   pattern net.venge.monotone
//   remote 66.96.28.3:4691
//
//   server local
//   host 127.0.0.1
//   pattern *
//   local -d /usr/local/src/managed/mt.db~ *
//
// or in general, one block of one or more lines of
//   userpass <username> <password>
// followed by any number of blocks of a
//   server <name>
// line followed by one or more
//   host <hostname>
// lines and/or one or more
//   pattern <pattern>
// lines, and one of
//    remote <address:port>
//    local <arguments>
// , with blocks separated by blank lines
//
// "userpass" lines specify who is allowed to use the administrative port.
//
// A request to server "hostname" will be directed to the
// server at <ip-address>:<port-number>, if that stem is marked as remote,
// and to a local server managed by the usher, started with the given
// arguments ("monotone serve --bind=something <server arguments>"),
// if it is marked as local.
// Note that "hostname" has to be an initial substring of who the client asked
// to connect to, but does not have to match exactly. This means that you don't
// have to know in advance whether clients will be asking for
// <host> or <host>:<port> .
//
//
// Admin commands
//
// If the -a option is given, the usher will listen for administrative
// connections on that port. The connecting client gives commands of the form
//   COMMAND [arguments] <newline>
// , and after any command except USERPASS the usher will send a reply and
// close the connection. The reply will always end with a newline.
//
// USERPASS username password
// Required before any other command, so random people can't do bad things.
// If incorrect, the connection will be closed immediately.
//
// STATUS [servername]
// Get the status of a server, as named by the "server" lines in the
// config file. If a server is specified, the result will be one of:
//   REMOTE - this is a remote server without active connections
//   ACTIVE n - this server currently has n active connections
//   WAITING - this (local) server is running, but has no connections
//   SLEEPING - this (local) server is not running, but is available
//   STOPPING n - this (local) server has been asked to stop, but still has
//                n active connections. It will not accept further connections.
//   STOPPED - this (local) server has been stopped, and will not accept
//             connections. The server process is not running.
//   SHUTTINGDOWN - the usher has been shut down, no servers are accepting
//                  connections.
//   SHUTDOWN - the usher has been shut down, all connections have been closed,
//              and all local server processes have been stopped.
// If no server is specified, the repsonse will be SHUTTINGDOWN, SHUTDOWN,
// WAITING, or ACTIVE (with n being the total number of open connections,
// across all servers).
//
// STOP servername
// Prevent the given local server from receiving further connections, and stop
// it once all connections are closed. The result will be the new status of
// that server: ACTIVE local servers become STOPPING, and WAITING and SLEEPING
// servers become STOPPED. Servers in other states are not affected.
//
// START servername
// Allow a stopped or stopping server to receive connections again. The result
// will be the new status of that server. (A server in the "STOPPING" state
// becomes ACTIVE, and a STOPPED server becomes SLEEPING. A server in some
// other state is not affected.)
//
// LIST [state]
// Returns a space-separated list of all servers. If a state is given, only
// list the servers that are in that state.
//
// SHUTDOWN
// Do not accept new connections for any servers, local or remote. Returns "ok".
//
// STARTUP
// Begin accepting connections again after a SHUTDOWN. Returns "ok".
//
// CONNECTIONS
// Returns the number of connections currently open.
//
// RELOAD
// Reload the config file (same as sending SIGHUP). The reply will be "ok",
// and will not be given until the config file has been reloaded.
//

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <string.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <time.h>

#include <string>
#include <list>
#include <iostream>
#include <sstream>
#include <fstream>
#include <vector>
#include <set>
#include <map>

#include <boost/lexical_cast.hpp>
#include <boost/shared_ptr.hpp>

using std::vector;
using std::max;
using std::string;
using std::list;
using std::set;
using std::map;
using boost::lexical_cast;
using boost::shared_ptr;
using std::cerr;
using std::pair;
using std::make_pair;

// defaults, overridden by command line
int listenport = 4691;
string listenaddr = "0.0.0.0";
string monotone = "monotone";

// keep local servers around for this many seconds after the last
// client disconnects from them (only accurate to ~10 seconds)
int const server_idle_timeout = 60;

// ranges that dynamic (local) servers can be put on
int const minport = 15000;
int const maxport = 65000;
int const minaddr[] = {127, 0, 1, 1};
int const maxaddr[] = {127, 254, 254, 254};
int currport = 0;
int curraddr[] = {0, 0, 0, 0};

char const netsync_version = 6;

string const greeting = " Hello! This is the monotone usher at localhost. What would you like?";

string const notfound = "!Sorry, I don't know where to find that.";

string const disabled = "!Sorry, this usher is not currently accepting connections.";

string const srvdisabled = "!Sorry, that server is currently disabled.";

struct errstr
{
  std::string name;
  int err;
  errstr(std::string const & s): name(s), err(0) {}
  errstr(std::string const & s, int e): name(s), err(e) {}
};

int tosserr(int ret, std::string const & name)
{
  if (ret == -1)
    throw errstr(name, errno);
  if (ret < 0)
    throw errstr(name, ret);
  return ret;
}

// packet format is:
// byte version
// byte cmd {100 if we send, 101 if we receive}
// uleb128 {size of everything after this}
// uleb128 {size of string}
// string
// {
// uleb128 {size of string}
// string
// } // only present if we're receiving

// uleb128 is
// byte 0x80 | <low 7 bits>
// byte 0x80 | <next 7 bits>
// ...
// byte 0xff & <remaining bits>
//
// the high bit says that this byte is not the last

void make_packet(std::string const & msg, char * & pkt, int & size)
{
  size = msg.size();
  char const * txt = msg.c_str();
  char header[6];
  header[0] = netsync_version;
  header[1] = 100;
  int headersize;
  if (size >= 128) {
    header[2] = 0x80 | (0x7f & (char)(size+2));
    header[3] = (char)((size+2)>>7);
    header[4] = 0x80 | (0x7f & (char)(size));
    header[5] = (char)(size>>7);
    headersize = 6;
  } else if (size >= 127) {
    header[2] = 0x80 | (0x7f & (char)(size+1));
    header[3] = (char)((size+1)>>7);
    header[4] = (char)(size);
    headersize = 5;
  } else {
    header[2] = (char)(size+1);
    header[3] = (char)(size);
    headersize = 4;
  }
  pkt = new char[headersize + size];
  memcpy(pkt, header, headersize);
  memcpy(pkt + headersize, txt, size);
  size += headersize;
}

struct buffer
{
  static int const buf_size = 2048;
  static int const buf_reset_size = 1024;
  char * ptr;
  int readpos;
  int writepos;
  buffer(): readpos(0), writepos(0)
  {
    ptr = new char[buf_size];
  }
  ~buffer(){delete[] ptr;}
  buffer(buffer const & b)
  {
    ptr = new char[buf_size];
    memcpy(ptr, b.ptr, buf_size);
    readpos = b.readpos;
    writepos = b.writepos;
  }
  bool canread(){return writepos > readpos;}
  bool canwrite(){return writepos < buf_size;}
  void getread(char *& p, int & n)
  {
    p = ptr + readpos;
    n = writepos - readpos;
  }
  void getwrite(char *& p, int & n)
  {
    p = ptr + writepos;
    n = buf_size - writepos;
  }
  void fixread(int n)
  {
    if (n < 0) throw errstr("negative read\n", 0);
    readpos += n;
    if (readpos == writepos) {
      readpos = writepos = 0;
    } else if (readpos > buf_reset_size) {
      memcpy(ptr, ptr+readpos, writepos-readpos);
      writepos -= readpos;
      readpos = 0;
    }
  }
  void fixwrite(int n)
  {
    if (n < 0)
      throw errstr("negative write\n", 0);
    writepos += n;
  }
};

struct sock
{
  int *s;
  static set<int*> all_socks;
  operator int()
  {
    if (!s)
      return -1;
    else
      return s[0];
  }
  sock(int ss)
  {
    s = new int[2];
    s[0] = ss;
    s[1] = 1;
    all_socks.insert(s);
  }
  sock(sock const & ss)
  {
    s = ss.s;
    if (s)
      s[1]++;
  }
  void deref()
  {
    if (s && !(--s[1])) {
      try {
        close();
      } catch(errstr & e) {
        // if you want it to throw errors, call close manually
      }
      delete[] s;
      all_socks.erase(all_socks.find(s));
    }
    s = 0;
  }
  ~sock()
  {
    deref();
  }
  sock const & operator=(int ss)
  {
    deref();
    s = new int[2];
    all_socks.insert(s);
    s[0]=ss;
    return *this;
  }
  sock const & operator=(sock const & ss)
  {
    deref();
    s = ss.s;
    if (s)
      ++s[1];
    return *this;
  }
  void close()
  {
    if (!s || s[0] == -1)
      return;
    shutdown(s[0], SHUT_RDWR);
    while (::close(s[0]) < 0) {
      if (errno == EIO)
        throw errstr("close failed", errno);
      if (errno != EINTR)
        break;
      }
    s[0]=-1;
  }
  static void close_all_socks()
  {
    for (set<int*>::iterator i = all_socks.begin(); i != all_socks.end(); ++i) {
      while (::close((*i)[0]) < 0)
        if (errno != EINTR)
          break;
    }
  }
  bool read_to(buffer & buf)
  {
    if (!s)
      return false;
    char *p;
    int n;
    buf.getwrite(p, n);
    n = read(s[0], p, n);
    if (n < 1) {
      close();
      return false;
    } else
      buf.fixwrite(n);
    return true;
  }
  bool write_from(buffer & buf)
  {
    if (!s)
      return false;
    char *p;
    int n;
    buf.getread(p, n);
    n = write(s[0], p, n);
    if (n < 1) {
      close();
      return false;
    } else
      buf.fixread(n);
    return true;
  }
};
set<int*> sock::all_socks;

bool check_address_empty(string const & addr, int port)
{
  sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()");
  int yes = 1;
  tosserr(setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
      &yes, sizeof(yes)), "setsockopt");
  sockaddr_in a;
  memset (&a, 0, sizeof (a));
  if (!inet_aton(addr.c_str(), (in_addr *) &a.sin_addr.s_addr))
    throw errstr("bad ip address format", 0);
  a.sin_port = htons(port);
  a.sin_family = AF_INET;
  int r = bind(s, (sockaddr *) &a, sizeof(a));
  s.close();
  return r == 0;
}

void find_addr(string & addr, int & port)
{
  if (currport == 0) {
    currport = minport-1;
    for(int i = 0; i < 4; ++i)
      curraddr[i] = minaddr[i];
  }
  do {
    // get the next address in our list
    if (++currport > maxport) {
      currport = minport;
      for (int i = 0; i < 4; ++i) {
        if (++curraddr[i] <= maxaddr[i])
          break;
        curraddr[i] = minaddr[i];
      }
    }
    port = currport;
    addr = lexical_cast<string>(curraddr[0]) + "." +
           lexical_cast<string>(curraddr[1]) + "." +
           lexical_cast<string>(curraddr[2]) + "." +
           lexical_cast<string>(curraddr[3]);
  } while (!check_address_empty(addr, port));
}

sock start(string addr, int port)
{
  sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()");
  int yes = 1;
  tosserr(setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
      &yes, sizeof(yes)), "setsockopt");
  sockaddr_in a;
  memset (&a, 0, sizeof (a));
  if (!inet_aton(addr.c_str(), (in_addr *) &a.sin_addr.s_addr))
    throw errstr("bad ip address format", 0);
  a.sin_port = htons(port);
  a.sin_family = AF_INET;
  tosserr(bind(s, (sockaddr *) &a, sizeof(a)), "bind");
  cerr<<"bound to "<<addr<<":"<<port<<"\n";
  listen(s, 10);
  return s;
}

sock make_outgoing(int port, string const & address)
{
  sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()");

  struct sockaddr_in a;
  memset(&a, 0, sizeof(a));
  a.sin_family = AF_INET;
  a.sin_port = htons(port);

  if (!inet_aton(address.c_str(), (in_addr *) &a.sin_addr.s_addr))
    throw errstr("bad ip address format", 0);

  tosserr(connect(s, (sockaddr *) &a, sizeof (a)), "connect()");
  return s;
}

int fork_server(vector<string> const & args)
{
  int err[2];
  if (pipe(err) < 0)
    return false;
  int pid = fork();
  if (pid == -1) {
    close(err[0]);
    close(err[1]);
    cerr<<"Failed to fork server.\n";
    return false;
  } else if (pid == 0) {
    close(err[0]);
    close(0);
    close(1);
    close(2);
    sock::close_all_socks();
    if (dup2(err[1], 2) < 0) {
      exit(1);
    }

    char ** a = new char*[args.size()+1];
    for (unsigned int i = 0; i < args.size(); ++i) {
      a[i] = new char[args[i].size()+1];
      memcpy(a[i], args[i].c_str(), args[i].size()+1);
    }
    a[args.size()] = 0;

    execvp(a[0], a);
    perror("execvp failed\n");
    exit(1);
  } else {
    close(err[1]);
    char head[256];
    int got = 0;
    int r = 0;
    bool line = false;
    // the first line output on the server's stderr will be either
    // "monotone: beginning service on <interface> : <port>" or
    // "monotone: network error: bind(2) error: Address already in use"
    do {
      r = read(err[0], head + got, 256 - got);
      if (r)
        cerr<<"Read '"<<string(head+got, r)<<"'\n";
      if (r > 0) {
        for (int i = 0; i < r && !line; ++i)
          if (head[got+i] == '\n')
            line = true;
        got += r;
      }
    } while(r > 0 && !line && got < 256);
    head[got] = 0;
    if (string(head).find("beginning service") != string::npos)
      return pid;
    kill(pid, SIGKILL);
    do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR);
    return -1;
  }
}

bool connections_allowed = true;
int total_connections = 0;

struct serverstate
{
  enum ss {remote, active, waiting, sleeping, stopping,
           stopped, shuttingdown, shutdown, unknown};
  ss state;
  int num;
  serverstate(): state(unknown), num(0) {}
  serverstate const & operator=(string const & s)
  {
    if (s == "REMOTE")
      state = remote;
    else if (s == "ACTIVE")
      state = active;
    else if (s == "WAITING")
      state = waiting;
    else if (s == "SLEEPING")
      state = sleeping;
    else if (s == "STOPPING")
      state = stopping;
    else if (s == "STOPPED")
      state = stopped;
    else if (s == "SHUTTINGDOWN")
      state = shuttingdown;
    else if (s == "SHUTDOWN")
      state = shutdown;
    return *this;
  }
  bool operator==(string const & s)
  {
    serverstate foo;
    foo = s;
    return foo.state == state;
  }
};
std::ostream & operator<<(std::ostream & os, serverstate const & ss)
{
  switch (ss.state) {
  case serverstate::remote:
    os<<"REMOTE";
    break;
  case serverstate::active:
    os<<"ACTIVE "<<ss.num;
    break;
  case serverstate::waiting:
    os<<"WAITING";
    break;
  case serverstate::sleeping:
    os<<"SLEEPING";
    break;
  case serverstate::stopping:
    os<<"STOPPING "<<ss.num;
    break;
  case serverstate::stopped:
    os<<"STOPPED";
    break;
  case serverstate::shuttingdown:
    os<<"SHUTTINGDOWN "<<ss.num;
    break;
  case serverstate::shutdown:
    os<<"SHUTDOWN";
  case serverstate::unknown:
    break;
  }
  return os;
}

struct server
{
  bool enabled;
  list<map<string, shared_ptr<server> >::iterator> by_host, by_pat;
  map<string, shared_ptr<server> >::iterator by_name;
  static map<string, shared_ptr<server> > servers_by_host;
  static map<string, shared_ptr<server> > servers_by_pattern;
  static map<string, shared_ptr<server> > servers_by_name;
  static set<shared_ptr<server> > live_servers;
  bool local;
  int pid;
  string arguments;
  string addr;
  int port;
  int connection_count;
  int last_conn_time;
  server() : enabled(true), local(false), pid(-1), port(0),
   connection_count(0), last_conn_time(0)
  {
  }
  ~server()
  {
    yeskill();
  }
  serverstate get_state()
  {
    serverstate ss;
    ss.num = connection_count;
    if (!connections_allowed) {
      if (!total_connections)
        ss.state = serverstate::shutdown;
      else
        ss.state = serverstate::shuttingdown;
    } else if (connection_count) {
      if (enabled)
        ss.state = serverstate::active;
      else
        ss.state = serverstate::stopping;
    } else if (!local)
      ss.state = serverstate::remote;
    else if (!enabled)
      ss.state = serverstate::stopped;
    else if (pid == -1)
      ss.state = serverstate::sleeping;
    else
      ss.state = serverstate::waiting;
    return ss;
  }
  void delist()
  {
    vector<string> foo;
    set_hosts(foo);
    set_patterns(foo);
    servers_by_name.erase(by_name);
    by_name = 0;
  }
  void rename(string const & n)
  {
    shared_ptr<server> me = by_name->second;
    servers_by_name.erase(by_name);
    by_name = servers_by_name.insert(make_pair(n, me)).first;
  }
  void set_hosts(vector<string> const & h)
  {
    shared_ptr<server> me = by_name->second;
    map<string, shared_ptr<server> >::iterator c;
    for (list<map<string, shared_ptr<server> >::iterator>::iterator
           i = by_host.begin(); i != by_host.end(); ++i)
      servers_by_host.erase(*i);
    by_host.clear();
    for (vector<string>::const_iterator i = h.begin(); i != h.end(); ++i) {
      c = servers_by_host.find(*i);
      if (c != servers_by_host.end()) {
        list<map<string, shared_ptr<server> >::iterator>::iterator j;
        for (j = c->second->by_host.begin(); j != c->second->by_host.end();)
          {
            list<map<string, shared_ptr<server> >::iterator>::iterator j_saved
              = j;
            ++j;
            if ((*j_saved)->first == *i)
              {
                servers_by_host.erase(*j_saved);
                c->second->by_host.erase(j_saved);
              }
          }
      }
      c = servers_by_host.insert(make_pair(*i, me)).first;
      by_host.push_back(c);
    }
  }
  void set_patterns(vector<string> const & p)
  {
    shared_ptr<server> me = by_name->second;
    map<string, shared_ptr<server> >::iterator c;
    for (list<map<string, shared_ptr<server> >::iterator>::iterator
           i = by_pat.begin(); i != by_pat.end(); ++i)
      servers_by_pattern.erase(*i);
    by_pat.clear();
    for (vector<string>::const_iterator i = p.begin(); i != p.end(); ++i) {
      c = servers_by_pattern.find(*i);
      if (c != servers_by_pattern.end()) {
        list<map<string, shared_ptr<server> >::iterator>::iterator j;
        for (j = c->second->by_pat.begin(); j != c->second->by_pat.end(); ++j)
          if ((*j)->first == *i) {
            servers_by_pattern.erase(*j);
            c->second->by_pat.erase(j);
          }
      }
      c = servers_by_pattern.insert(make_pair(*i, me)).first;
      by_pat.push_back(c);
    }
  }
  sock connect()
  {
    if (!connections_allowed)
      throw errstr("all servers disabled");
    if (!enabled)
      throw errstr("server disabled");
    if (local && pid == -1) {
      // server needs to be started
      // we'll try 3 times, since there's a delay between our checking that
      // a port's available and the server taking it
      for (int i = 0; i < 3 && pid == -1; ++i) {
        if (i > 0 || port == 0)
          find_addr(addr, port);
        vector<string> args;
        args.push_back(monotone);
        args.push_back("serve");
        args.push_back("--bind=" + addr + ":" + lexical_cast<string>(port));
        unsigned int n = 0, m = 0;
        n = arguments.find_first_not_of(" \t");
        while (n != string::npos && m != string::npos) {
          m = arguments.find_first_of(" ", n);
          args.push_back(arguments.substr(n, m-n));
          n = arguments.find_first_not_of(" ", m);
        }
        pid = fork_server(args);
      }
    }
    sock s = make_outgoing(port, addr);
    if (local && !connection_count) {
      live_servers.insert(by_name->second);
      }
    ++connection_count;
    ++total_connections;
    return s;
  }
  void disconnect()
  {
    --total_connections;
    if (--connection_count || !local)
      return;
    last_conn_time = time(0);
    maybekill();
  }
  void maybekill()
  {
    if (!local)
      return;
    if (pid == -1)
      return;
    int difftime = time(0) - last_conn_time;
    if (!connection_count
        && (difftime > server_idle_timeout || !connections_allowed))
        yeskill();
    else if (waitpid(pid, 0, WNOHANG) > 0) {
      pid = -1;
      port = 0;
    }
  }
  void yeskill()
  {
    if (local && pid != -1) {
      kill(pid, SIGTERM);
      int r;
      do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR);
      pid = -1;
      port = 0;
      live_servers.erase(live_servers.find(by_name->second));
    }
  }
  string name()
  {
    if (local && port == 0)
      return "dynamic local server";
    else
      return addr + ":" + lexical_cast<string>(port);
  }
};

map<string, shared_ptr<server> > server::servers_by_host;
map<string, shared_ptr<server> > server::servers_by_pattern;
map<string, shared_ptr<server> > server::servers_by_name;
set<shared_ptr<server> > server::live_servers;

string getline(std::istream & in)
{
  string out;
  char buf[256];
  do {
    in.getline(buf, 256);
    int got = in.gcount()-1;
    if (got > 0)
      out.append(buf, got);
  } while (in.fail() && !in.eof());
  return out;
}

string read_server_record(std::istream & in)
{
  // server foobar
  // hostname foobar.com
  // hostname mtn.foobar.com
  // pattern com.foobar
  // remote 127.5.6.7:80
  //
  // server myproj
  // hostname localhost
  // local -d foo.db *
  vector<string> hosts, patterns;
  string name, desc;
  bool local(false);
  string line = getline(in);
  while (!line.empty()) {
    //    server     foobar
    //    ^     ^    ^
    //    a     b    c
    int a = line.find_first_not_of(" \t");
    int b = line.find_first_of(" \t", a);
    int c = line.find_first_not_of(" \t", b);
    string cmd = line.substr(a, b-a);
    string arg = line.substr(c);
    if (cmd == "server")
      name = arg;
    else if (cmd == "local") {
      local = true;
      desc = arg;
    } else if (cmd == "remote") {
      local = false;
      desc = arg;
    } else if (cmd == "host")
      hosts.push_back(arg);
    else if (cmd == "pattern")
      patterns.push_back(arg);
    line = getline(in);
  }
  if (name.empty())
    return string();
  shared_ptr<server> srv;
  map<string, shared_ptr<server> >::iterator
    i = server::servers_by_name.find(name);
  if (i != server::servers_by_name.end()) {
    if (local && i->second->local && i->second->arguments == desc)
      srv = i->second;
    else
      srv = shared_ptr<server>(new server);
    i->second->delist();
  } else
    srv = shared_ptr<server>(new server);
  srv->by_name = server::servers_by_name.insert(make_pair(name, srv)).first;
  srv->set_hosts(hosts);
  srv->set_patterns(patterns);
  if (local) {
    srv->local = true;
    srv->arguments = desc;
  } else {
    srv->local = false;
    unsigned int c = desc.find(":");
    if (c != desc.npos) {
      srv->addr = desc.substr(0, c);
      srv->port = lexical_cast<int>(desc.substr(c+1));
    } else {
      srv->addr = desc;
      srv->port = 4691;
    }
  }
  return name;
}

shared_ptr<server> get_server(string const & srv, string const & pat)
{
  map<string, shared_ptr<server> >::iterator i;
  for (i = server::servers_by_host.begin();
       i != server::servers_by_host.end(); ++i) {
    if (srv.find(i->first) == 0) {
      return i->second;
    }
  }
  for (i = server::servers_by_pattern.begin();
       i != server::servers_by_pattern.end(); ++i) {
    if (pat.find(i->first) == 0) {
      return i->second;
    }
  }
  std::cerr<<"no server found for '"<<pat<<"' at '"<<srv<<"'\n";
  return shared_ptr<server>();
}

shared_ptr<server> get_server(string const & name)
{
  map<string, shared_ptr<server> >::iterator i;
  for (i = server::servers_by_name.begin();
       i != server::servers_by_name.end(); ++i) {
    if (name == i->first) {
      return i->second;
    }
  }
  return shared_ptr<server>();
}

void kill_old_servers()
{
  set<shared_ptr<server> >::iterator i;
  for (i = server::live_servers.begin(); i != server::live_servers.end(); ++i) {
    (*i)->maybekill();
  }
}

int extract_uleb128(char *p, int maxsize, int & out)
{
  out = 0;
  int b = 0;
  unsigned char got;
  do {
    if (b == maxsize)
      return -1;
    got = p[b];
    out += ((int)(p[b] & 0x7f))<<(b*7);
    ++b;
  } while ((unsigned int)(b*7) < sizeof(int)*8-1 && (got & 0x80));
  return b;
}

int extract_vstr(char *p, int maxsize, string & out)
{
  int size;
  out.clear();
  int chars = extract_uleb128(p, maxsize, size);
  if (chars == -1 || chars + size > maxsize) {
    return -1;
  }
  out.append(p+chars, size);
  return chars+size;
}

bool extract_reply(buffer & buf, string & host, string & pat)
{
  char *p;
  int n, s;
  buf.getread(p, n);
  if (n < 4) return false;
  p += 2; // first 2 bytes are header
  n -= 2;
  // extract size, and make sure we have the entire packet
  int pos = extract_uleb128(p, n, s);
  if (pos == -1 || n < s+pos) {
    return false;
  }
  // extract host vstr
  int num = extract_vstr(p+pos, n-pos, host);
  if (num == -1) {
    return false;
  }
  pos += num;
  // extract pattern vstr
  num = extract_vstr(p+pos, n-pos, pat);
  if (num == -1) {
    cerr<<"old-style reply.\n";
    pat = host;
    host.clear();
    return true;
  }
  pos += num;
  buf.fixread(pos+2);
  return true;
}

struct channel
{
  static int counter;
  int num;
  sock cli;
  sock srv;
  bool have_routed;
  bool no_server;
  buffer cbuf;
  buffer sbuf;
  shared_ptr<server> who;
  channel(sock & c): num(++counter),
   cli(c), srv(-1),
   have_routed(false), no_server(false)
  {
    char * dat;
    int size;
    make_packet(greeting, dat, size);
    char *p;
    int n;
    sbuf.getwrite(p, n);
    if (n < size) size = n;
    memcpy(p, dat, size);
    sbuf.fixwrite(size);
    delete[] dat;

    cli.write_from(sbuf);
  }
  ~channel()
  {
    if (who && !no_server)
      who->disconnect();
  }
  bool is_finished()
  {
    return (cli == -1) && (srv == -1);
  }
  void add_to_select(int & maxfd, fd_set & rd, fd_set & wr, fd_set & er)
  {
    int c = cli;
    int s = srv;

    if (c > 0) {
      FD_SET(c, &er);
      if (cbuf.canwrite())
        FD_SET(c, &rd);
      if (sbuf.canread())
        FD_SET(c, &wr);
      maxfd = max(maxfd, c);
    }
    if (s > 0) {
      FD_SET(s, &er);
      if (sbuf.canwrite())
        FD_SET(s, &rd);
      if (cbuf.canread())
        FD_SET(s, &wr);
      maxfd = max(maxfd, s);
    }
  }
  bool process_selected(fd_set & rd, fd_set & wr, fd_set & er)
  {
    int c = cli;
    int s = srv;
/* NB: read oob data before normal reads */
    if (c > 0 && FD_ISSET(c, &er)) {
      char d;
      errno = 0;
      if (recv(c, &d, 1, MSG_OOB) < 1)
        cli.close(), c = -1;
      else
        send(s, &d, 1, MSG_OOB);
    }
    if (s > 0 && FD_ISSET(s, &er)) {
      char d;
      errno = 0;
      if (recv(s, &d, 1, MSG_OOB) < 1)
        srv.close(), s = -1;
      else
        send(c, &d, 1, MSG_OOB);
    }

    char *p=0;
    int n;

    if (c > 0 && FD_ISSET(c, &rd)) {
      if (!cli.read_to(cbuf)) c = -1;
      if (!have_routed) {
        string reply_srv, reply_pat;
        if (extract_reply(cbuf, reply_srv, reply_pat)) {
          who = get_server(reply_srv, reply_pat);
          if (who && who->enabled) {
            try {
              srv = who->connect();
              have_routed = true;
              s = srv;
            } catch (errstr & e) {
              cerr<<"cannot contact server "<<who->name()<<"\n";
              no_server = true;
            }
          } else {
            char * dat;
            int size;
            sbuf.getwrite(p, n);
            if (who)
              make_packet(srvdisabled, dat, size);
            else
              make_packet(notfound, dat, size);
            if (n < size) size = n;
            memcpy(p, dat, size);
            sbuf.fixwrite(size);
            delete[] dat;
            no_server = true;
          }
        }
      }
    }
    if (s > 0 && FD_ISSET(s, &rd)) {
      if (!srv.read_to(sbuf)) s = -1;
    }

    if (c > 0 && FD_ISSET(c, &wr)) {
      if (!cli.write_from(sbuf)) c = -1;
    }
    if (s > 0 && FD_ISSET(s, &wr)) {
      if (!srv.write_from(cbuf)) s = -1;
    }

    // close sockets we have nothing more to se