Below is the file 'database.ml' from this revision. You can also download the file.

open Viz_misc
open Viz_types

let () = Sqlite3.init

let monot_encode s = Base64.encode ~linelength:72 s
let monot_decode s = Base64.decode ~accept_spaces:true s

let sql_escape s =
  String.concat "''" (string_split ~collapse:false '\'' s)

let may_decode base64 v =
  if base64 then monot_decode v else v

let acc_one_col base64 acc row =
  may_decode base64 row.(0) :: acc



let setup_sqlite ?busy_handler db =
  if Viz_misc.debug "sql"
  then
    Sqlite3.trace_set db
      (fun s -> prerr_string "### sql: " ; prerr_endline s) ;
  Sqlite3.exec db "PRAGMA temp_store = MEMORY" ;
  may
    (Sqlite3.busy_set db)
    busy_handler

let schema_id db =
  let lines =
    Sqlite3.fetch
      db
      "SELECT sql FROM sqlite_master \
      	WHERE (type = 'table' OR type = 'index') \
      	  AND sql IS NOT NULL \
      	  AND name NOT LIKE 'sqlite_stat%' \
      	ORDER BY name"
      (acc_one_col false) [] in
  let schema_data = String.concat "\n" (List.rev lines) in
  let schema = Schema_lexer.massage_sql_tokens schema_data in
  Viz_misc.hex_enc (Crypto.sha1 schema)

let has_rosters db =
  Sqlite3.fetch db
    "SELECT name FROM sqlite_master WHERE name = 'rosters'"
    (fun _ _ -> true)
    false

let uses_base64 rosters schema_id =
  not rosters || schema_id = "1db80c7cee8fa966913db1a463ed50bf1b0e5b0e"

let fetch_pubkeys db base64 tbl =
  Sqlite3.fetch db
    "SELECT id, keydata, ROWID FROM public_keys"
    (fun () -> function
      | [| id; data; rowid |] ->
	  begin
	    try
	      let data = may_decode base64 data in
	      let key = Crypto.decode_rsa_pubkey data in
	      Hashtbl.add tbl id (key, int_of_string rowid - 1)
	    with Failure _ -> ()
	  end
      | _ -> ())
    ()

let fetch_branches base64 db =
  List.sort compare
    (Sqlite3.fetch db
       "SELECT DISTINCT value FROM revision_certs WHERE name = 'branch'"
       (acc_one_col base64)
       [])

let view_name_domain = "ids_of_branch"
let view_name_limit  = "ids_of_branch_with_date_limit"

let sql_of_bool b = if b then `INT 1 else `INT 0

let id_set_add_if t v s =
  if t && v <> "" then IdSet.add v s else s

let add_node kind id rel_id rel nodes =
  try
    let current_node = NodeMap.find id nodes in
    if List.mem_assoc rel_id current_node.family
    then
      nodes
    else
      NodeMap.add id
	{ current_node with family = (rel_id, rel) :: current_node.family }
	nodes
  with Not_found ->
    NodeMap.add id
      { id = id ; kind = kind ; family = [ rel_id, rel ] }
      nodes


let process_ancestry_row g parent parent_kind child child_kind =
  assert (parent_kind = REGULAR || child_kind = REGULAR) ;
  assert (parent <> "" && child <> "") ;
  { nodes    = (add_node parent_kind parent child CHILD (
		add_node child_kind child parent PARENT g.nodes)) ;

    ancestry =
      EdgeMap.add (parent, child)
        (if parent_kind = REGULAR && child_kind = REGULAR
         then SAME_BRANCH
         else BRANCHING)
        g.ancestry ;

    neighbour_nodes = (id_set_add_if (parent_kind <> REGULAR) parent (
		       id_set_add_if (child_kind  <> REGULAR) child
			 g.neighbour_nodes))
  }

let process_ancestry_row_simple g = function
  | [| "" ; child |] when not (NodeMap.mem child g.nodes) ->
      let new_node = { id = child ; kind = REGULAR ; family = [] } in
      let nodes = NodeMap.add child new_node g.nodes in
      { g with nodes = nodes }

  | [| parent ; child |] ->
      process_ancestry_row g
	parent REGULAR
	child REGULAR

  | _ -> g

let process_ancestry_row_neigh_out g = function
  | [| parent ; child ; "0" |] ->
	process_ancestry_row g
	  parent REGULAR
	  child NEIGHBOUR_OUT
  | [| parent ; child ; _ |] ->
      { g with ancestry = EdgeMap.add (parent, child) SAME_BRANCH g.ancestry }
  | _ -> g

let process_ancestry_row_neigh_in g = function
  | [| parent ; child ; "0" |] ->
      process_ancestry_row g
	parent NEIGHBOUR_IN
	child REGULAR
  | [| parent ; child ; _ |] ->
      { g with ancestry = EdgeMap.add (parent, child) SAME_BRANCH g.ancestry }
  | _ -> g


let number_of_parent node =
  List.fold_left
    (fun n -> function
      | (_, PARENT) -> n + 1
      | _ -> n)
    0
    node.family

let find_merge_nodes nodes =
  NodeMap.fold
    (fun id node m ->
      if number_of_parent node > 1
      then NodeMap.add id { node with kind = MERGE } m
      else m)
    nodes
    nodes

let process_branching_edge_row g = function
  | [| parent; child |] ->
      { g with ancestry = EdgeMap.add (parent, child) BRANCHING g.ancestry }
  | _ -> g


let fetch_children db =
  let stmt = lazy
      (Sqlite3.prepare_one
	 db "SELECT child FROM revision_ancestry WHERE parent = ?") in
  fun id f init ->
    let stmt = Lazy.force stmt in
    Sqlite3.reset stmt ;
    Sqlite3.bind stmt 1 (`TEXT id) ;
    Sqlite3.fold_rows
      (fun acc stmt -> f acc (Sqlite3.column_text stmt 0))
      init
      stmt


let collect_tags db base64 nodes =
  let stmt = Sqlite3.prepare_one db
      "SELECT value FROM revision_certs WHERE id = ? AND name = 'tag'" in
  NodeMap.fold
    (fun id node m ->
      Sqlite3.reset stmt ;
      Sqlite3.bind stmt 1 (`TEXT id) ;
      Sqlite3.fold_rows
	(fun m stmt ->
	  let r = Sqlite3.column_blob stmt 0 in
	  let v = may_decode base64 r in
	  NodeMap.add id { node with kind = TAGGED v } m)
	m
	stmt)
    nodes
    nodes

let fetch_agraph_with_view db base64 (query, query_limit) =
  let view_name_limit =
    if query_limit <> QUERY_NO_LIMIT
    then view_name_limit
    else view_name_domain in

  let agraph = Viz_types.empty_agraph in

  (* grab all node ids and edges we're interested in *)
  let agraph =
    Sqlite3.fetch_f db process_ancestry_row_simple agraph
      "SELECT parent, child FROM revision_ancestry, %s \
        WHERE parent = id AND child IN %s" view_name_limit view_name_limit in

  (* also grab neighbor nodes *)
  let agraph =
    Sqlite3.fetch_f db process_ancestry_row_neigh_out agraph
      "SELECT parent, child, child IN %s FROM revision_ancestry
        WHERE parent IN %s AND child NOT IN %s"
      view_name_domain view_name_limit view_name_limit in
  let agraph =
    Sqlite3.fetch_f db process_ancestry_row_neigh_in agraph
      "SELECT parent, child, parent IN %s FROM revision_ancestry
        WHERE child IN %s AND parent != '' AND parent NOT IN %s"
      view_name_domain view_name_limit view_name_limit in

  (* find merge/propagate nodes (they have more than one parent) *)
  let agraph =
    { agraph with nodes = find_merge_nodes agraph.nodes } in

  (* get tags *)
  let agraph =
    { agraph with nodes = collect_tags db base64 agraph.nodes } in

  (* determine the branching edges *)
  let agraph =
    begin
      match query with
      | QUERY_BRANCHES [ _ ] ->
          (* we already have the branching edges *)
	  agraph
      | _ ->
	  (* we need another database query *)
	  Sqlite3.fetch_f db
	    process_branching_edge_row agraph
            "SELECT parent, child \
               FROM revision_ancestry AS A \
              WHERE A.child IN %s AND A.parent != '' AND \
	            NOT EXISTS \
                     (SELECT P.id FROM revision_certs AS C, revision_certs AS P \
                       WHERE C.id = A.child AND P.id = A.parent \
                             AND C.name = 'branch' AND P.name = 'branch' \
			     AND C.value = P.value)"
            view_name_limit
    end in

  (* reconnect disconnected components *)
  let agraph =
    if query = QUERY_ALL
    then agraph
    else Components.reconnect (fetch_children db) agraph in

  agraph


let encode_quote base64 s =
  if base64
  then
    String.concat "" [ "\'" ; monot_encode s ; "\'" ]
  else
    String.concat "" [ "X\'" ; Viz_misc.hex_enc s ; "\'" ]

let fetch_with_view query base64 db f =
  let (query_domain, query_limit) = query in

  let register_date_p () =
    match query_limit with
    | QUERY_BETWEEN (d1, d2) ->
	Sqlite3.create_fun_1 db "date_p"
	  (fun arg ->
	    let d =
	      if base64
	      then monot_decode (Sqlite3.value_text arg)
	      else Sqlite3.value_blob arg in
	    sql_of_bool (d1 <= d && d <= d2))
    | _ -> () in

  let view_query_domain =
    match query_domain with
    | QUERY_ALL -> Printf.sprintf
	  "CREATE TEMP TABLE %s AS \
           SELECT DISTINCT id FROM revision_certs
            WHERE name = 'branch'" view_name_domain
    | QUERY_BRANCHES q -> Printf.sprintf
	  "CREATE TEMP TABLE %s AS \
	   SELECT DISTINCT id FROM revision_certs \
            WHERE name = 'branch' AND value IN (%s)"
	  view_name_domain
	  (String.concat ", "
	     (List.map (encode_quote base64) q)) in

  let view_query_date_limit () =
    Printf.sprintf
      "CREATE TEMP TABLE %s AS \
       SELECT DISTINCT id FROM revision_certs \
        WHERE name = 'date' AND id IN %s AND date_p(value)"
      view_name_limit view_name_domain in

  Viz_misc.bracket
    ~before:(fun () ->
      (* We fetch the ids matching the query (ie those on certain branches) *)
      (* and store them in a view. *)
      Sqlite3.exec db view_query_domain ;
      Sqlite3.exec_f db
	"CREATE INDEX %s__id ON %s (id)" view_name_domain view_name_domain ;
      if query_limit <> QUERY_NO_LIMIT
      then begin
	register_date_p () ;
	Sqlite3.exec db (view_query_date_limit ()) ;
	Sqlite3.exec_f db
	  "CREATE INDEX %s__id ON %s (id)" view_name_limit view_name_limit
      end)
    ~action:(fun () -> f db base64 query)
    ~after:(fun () ->
      if query_limit <> QUERY_NO_LIMIT
      then begin
	Sqlite3.delete_function db "date_p" ;
	Sqlite3.exec_f db "DROP TABLE %s" view_name_limit
      end ;
      Sqlite3.exec_f db "DROP TABLE %s" view_name_domain)
    ()

let fetch_agraph query base64 db =
  try fetch_with_view query base64 db fetch_agraph_with_view
  with exn ->
    Printf.eprintf "fetch_agraph exn: %s\n%!"
      (Printexc.to_string exn) ;
    raise exn

let decode_and_parse_revision rostered s =
  let revision_parser =
    if rostered
    then Revision_parser.revision_set
    else Revision_parser.pre_rosters_revision_set in
  revision_parser
    Revision_lexer.lex
    (Lexing.from_string
       (Unzip.inflate_str ~kind:Unzip.GZip s))

let fetch_revision_set rostered b64 db id =
  decode_and_parse_revision
    rostered
    (List.hd
       (Sqlite3.fetch_f db (acc_one_col b64) []
	  "SELECT data FROM revisions WHERE id = '%s'" id))

let verify_cert_sig pubkeys keypair name id v signature =
  try
    let (pubkey, _) = Hashtbl.find pubkeys keypair in
    if Crypto.rsa_sha1_verify pubkey
	(Printf.sprintf "[%s@%s:%s]" name id (Base64.encode v))
	signature
    then SIG_OK
    else SIG_BAD
  with Not_found -> SIG_UNKNOWN

let process_certs pubkeys b64 acc = function
  | [| id; name; v; keypair; signature |] ->
      let dec_v = may_decode b64 v in
      let dec_sig = may_decode b64 signature in
      { c_id = id ;
	c_name = name ;
	c_value = dec_v ;
	c_signer_id = keypair ;
	c_signature = verify_cert_sig pubkeys keypair name id dec_v dec_sig } :: acc
  | _ -> acc

let fetch_certs db pubkeys b64 id =
  Sqlite3.fetch_f db (process_certs pubkeys b64) []
    "SELECT id, name, value, keypair, signature \
       FROM revision_certs WHERE id = '%s'" id

let prepare_fetch_one_cert_signer db =
  Sqlite3.prepare_one db
    "SELECT keypair FROM revision_certs WHERE id = ? AND name = ?"
let prepare_fetch_one_cert_value db =
  Sqlite3.prepare_one db
    "SELECT value   FROM revision_certs WHERE id = ? AND name = ?"

let fetch_one_cert_field stmt id name kind =
  Sqlite3.reset stmt ;
  Sqlite3.bind stmt 1 (`TEXT id) ;
  Sqlite3.bind stmt 2 (`TEXT name) ;
  Sqlite3.fold_rows
    (fun acc stmt ->
      let v = Sqlite3.column_text stmt 0 in
      match kind with
      | `SIGNER
      | `VALUE -> v :: acc
      | `VALUE_B64 -> monot_decode v :: acc)
    []
    stmt

let get_matching_cert db b64 name p =
  List.rev
    (Sqlite3.fetch_f db
       (fun acc -> function
	 | [| id; v |] ->
	     let dv = may_decode b64 v in
	     if p dv
	     then (id, dv) :: acc
	     else acc
	 | _ -> acc)
       []
       "SELECT id, value FROM revision_certs WHERE name = '%s'" name)



let spawn_monotone monotone_exe db_fname cmd input status cb =
  let cmd = monotone_exe :: "--db" :: db_fname :: cmd in
  if Viz_misc.debug "exec"
  then Printf.eprintf "### exec: Running '%s'\n%!" (String.concat " " cmd) ;
  try
    status#push "Running monotone ..." ;
    Subprocess.spawn
      ~encoding:`NONE ~cmd ~input
      ~reap_callback:status#pop
      (fun ~exceptions ~stdout ~stderr status ->
	if status = 0
	then
	  cb (`OUTPUT stdout)
	else
	  let error fmt =
	    Printf.kprintf (fun s -> cb (`SUB_PROC_ERROR s)) fmt in
	  if stderr = ""
	  then
	    error "Monotone exited with status %d:\n%s" status
	      (String.concat "\n" (List.map Printexc.to_string exceptions))
	  else
	    error "Monotone error:\n%s" stderr)
  with Gspawn.Error (_, msg) ->
    Viz_types.errorf "Could not execute monotone:\n%s" msg







type t = {
    filename  : string ;
    db        : Sqlite3.db ;
    pubkeys   : (string, Crypto.pub_rsa_key * int) Hashtbl.t ;
    stmts     : Sqlite3.stmt array ;
    rostered  : bool ;
    base64    : bool ;
    schema_id : string
  }


let sqlite_try f db =
  try f db.db
  with Sqlite3.Error (_, msg) ->
    Viz_types.errorf "Error processing database %s:\n%s" db.filename msg




let open_db ?busy_handler fname =
  if not (Sys.file_exists fname)
  then Viz_types.errorf "No such file: %s" fname ;
  let db =
    try Sqlite3.open_db fname
    with Sqlite3.Error (_, msg) ->
      Viz_types.errorf "Could not open database %s:\n%s" fname msg in
  let pubkeys = Hashtbl.create 17 in
  try
    setup_sqlite ?busy_handler db ;
    let stmts = [| prepare_fetch_one_cert_signer db ;
                   prepare_fetch_one_cert_value db |] in
    let rostered = has_rosters db in
    let schema   = schema_id db in
    let base64   = uses_base64 rostered schema in
    fetch_pubkeys db base64 pubkeys ;
    { filename  = fname ;
      db        = db ;
      pubkeys   = pubkeys ;
      stmts     = stmts ;
      rostered  = rostered ;
      base64    = base64 ;
      schema_id = schema
    }
  with Sqlite3.Error (_, msg) ->
    Sqlite3.close_db db ;
    Viz_types.errorf "Error processing database %s:\n%s" fname msg

let close_db { db = db ; stmts = stmts } =
  Sqlite3.close_db db

let with_progress prg f db =
  Sqlite3.progress_handler_set db.db 2000 prg ;
  try let r = f db in Sqlite3.progress_handler_unset db.db ; r
  with exn -> Sqlite3.progress_handler_unset db.db ; raise exn

let get_filename d = d.filename

let fetch_branches db =
  sqlite_try (fetch_branches db.base64) db

let fetch_ancestry_graph db query =
  sqlite_try (fetch_agraph query db.base64) db

let fetch_revision d id =
    try
      let revision_set =
	sqlite_try (fun db ->
	  fetch_revision_set d.rostered d.base64 db id)
	  d in
      let (manifest_id, edges) = revision_set in
      { revision_id = id ;
	manifest_id = manifest_id ;
	revision_set =
	List.map
	  (fun e -> (e.Revision_types.old_revision, e.Revision_types.change_set) )
	  edges ;
	certs = [] }
    with Parsing.Parse_error ->
      Viz_types.errorf "Error while parsing revision set of %s" id

let fetch_certs_and_revision d id =
  { (fetch_revision d id)
    with certs =
      sqlite_try (fun db ->
	fetch_certs db d.pubkeys d.base64 id) d }

let fetch_cert_signer db id name =
  sqlite_try (fun _ -> fetch_one_cert_field db.stmts.(0) id name `SIGNER) db

let fetch_cert_value db id name =
  let kind = if db.base64 then `VALUE_B64 else `VALUE in
  sqlite_try (fun _ -> fetch_one_cert_field db.stmts.(1) id name kind) db

let get_key_rowid { pubkeys = pubkeys } id =
  let (_, rowid) = Hashtbl.find pubkeys id in
  rowid

let get_matching_tags db p =
  get_matching_cert db.db db.base64 "tag" p

let get_matching_dates db d_pref =
  get_matching_cert db.db db.base64 "date"
    (string_is_prefix d_pref)

let get_matching_ids db id_pref =
  get_matching_cert db.db db.base64 "branch"
    (string_is_prefix id_pref)

let run_monotone_diff db monotone_exe status cb (old_id, new_id) =
  ignore (spawn_monotone
	    monotone_exe db.filename
	    [ "--revision" ; old_id ;
	      "--revision" ; new_id ; "diff" ]
	    None status cb)


let encode_automate_stdio selectors =
  let b = Buffer.create 512 in
  List.iter
    (fun s ->
      Printf.bprintf b "l6:select" ;
      Printf.bprintf b "%d:%se\n" (String.length s) s)
    selectors ;
  let r = Buffer.contents b in
  Viz_misc.log "stdio" "stdio input: %S" r ;
  r

let decode_automate_stdio s =
  let rec loop acc cmd_buf i =
    if i >= String.length s
    then List.rev acc
    else begin
      let c1 = String.index_from s i ':' in
      let number = int_of_string (string_slice ~s:i ~e:c1 s) in
      let code   = int_of_char s.[c1 + 1] - int_of_char '0' in
      let c2 = String.index_from s (c1 + 1) ':' in
      let last   = s.[c2 + 1] in
      let c3 = String.index_from s (c2 + 1) ':' in
      let c4 = String.index_from s (c3 + 1) ':' in
      let len   = int_of_string (string_slice ~s:(c3 + 1) ~e:c4 s) in
      Buffer.add_substring cmd_buf s (c4 + 1) len ;

      match code with
      | 0 when last = 'l' ->
	  let output = Buffer.contents cmd_buf in
	  Buffer.clear cmd_buf ;
	  loop ((number, output) :: acc) cmd_buf (c4 + 1 + len)
      | _ when last = 'l' ->
	  let msg = Buffer.contents cmd_buf in
	  Viz_misc.log "stdio" "got a stdio error (code=%d): %S" code msg ;
	  failwith msg
      | _ ->
	  Buffer.add_substring cmd_buf s (c4 + 1) len ;
	  loop acc cmd_buf (c4 + 1 + len)
    end in
  loop [] (Buffer.create 1024) 0


let collect_ids stdio_output =
  Viz_misc.list_uniq
    (List.fold_left
       (fun acc (_, output) ->
	 (string_split '\n' output) @ acc)
       []
       stdio_output)

let run_monotone_select db monotone_exe status cb selectors =
  spawn_monotone
    monotone_exe db.filename [ "automate" ; "stdio" ]
    (Some (encode_automate_stdio selectors))
    status
    (function
      | `OUTPUT s ->
	  let ids =
	    try `IDS (collect_ids (decode_automate_stdio s))
	    with Failure msg -> `SUB_PROC_ERROR msg in
	  cb ids
      | `SUB_PROC_ERROR _ as r ->
	  cb r)