Below is the file 'database.ml' from this revision. You can also download the file.

open Viz_misc
open Viz_types

let () = Sqlite3.init

let monot_encode s = Base64.encode ~linelength:72 s
let monot_decode s = Base64.decode ~accept_spaces:true s

let may_decode base64 v =
  if base64 then monot_decode v else v

let blob_col base64 stmt n =
  may_decode base64 (Sqlite3.column_blob stmt n)
let acc_one_col base64 acc stmt =
  blob_col base64 stmt 0 :: acc



let setup_sqlite ?busy_handler db =
  if Viz_misc.debug "sql"
  then
    Sqlite3.trace_set db
      (fun s -> Printf.eprintf "### %2.3f sql: %s\n%!" (Sys.time ()) s) ;
  Sqlite3.exec db "PRAGMA temp_store = MEMORY" ;
  may
    (Sqlite3.busy_set db)
    busy_handler

let schema_id db =
  let lines =
    Sqlite3.fetch
      db
      "SELECT sql FROM sqlite_master \
      	WHERE (type = 'table' OR type = 'index') \
      	  AND sql IS NOT NULL \
      	  AND name NOT LIKE 'sqlite_stat%' \
      	ORDER BY name"
      (acc_one_col false) [] in
  let schema_data = String.concat "\n" (List.rev lines) in
  let schema = Schema_lexer.massage_sql_tokens schema_data in
  Viz_misc.hex_enc (Crypto.sha1 schema)

let has_rosters db =
  Sqlite3.fetch db
    "SELECT name FROM sqlite_master WHERE name = 'rosters'"
    (fun _ _ -> true)
    false

let uses_base64 rosters schema_id =
  not rosters || schema_id = "1db80c7cee8fa966913db1a463ed50bf1b0e5b0e"

let fetch_pubkeys db base64 tbl =
  Sqlite3.fetch db
    "SELECT id, keydata, ROWID FROM public_keys"
    (fun () stmt ->
      let id    = Sqlite3.column_text stmt 0 in
      let data  = blob_col base64 stmt 1 in
      let rowid = Sqlite3.column_int stmt 2 in
      try
	let key = Crypto.decode_rsa_pubkey data in
	Hashtbl.add tbl id (key, rowid - 1)
      with Failure _ -> ())
    ()

let fetch_branches base64 db =
  List.sort compare
    (Sqlite3.fetch db
       "SELECT value, COUNT(id) FROM revision_certs WHERE name = 'branch' GROUP BY value"
       (fun acc s ->
	 let b = blob_col base64 s 0 in
	 let n = Sqlite3.column_int s 1 in
	 (b, n) :: acc)
       [])

let view_name_domain = "ids_of_branch"
let view_name_limit  = "ids_of_branch_with_date_limit"

let sql_of_bool b = if b then `INT 1 else `INT 0



let get_relative n r =
  List.fold_left
    (fun acc -> function
      | (id, rel) when r = rel -> id :: acc
      | _  -> acc)
    []
    n.family

let count_parents n =
  List.length (get_relative n PARENT)

let find_merge_nodes g =
  NodeMap.iter
    (fun id node ->
      if node.kind = REGULAR && count_parents node > 1
      then node.kind <- MERGE)
    g.nodes

let grab_one_int stmt id =
  Sqlite3.bind_fetch stmt
    [ `TEXT id ]
    (fun _ stmt -> Sqlite3.column_int stmt 0)
    0

let count_all_parents db =
  let stmt =
      Sqlite3.prepare_one
      db "SELECT COUNT(parent) FROM revision_ancestry WHERE parent != '' AND child = ?" in
  fun id ->
    grab_one_int stmt id

let count_regular_children db =
  let stmt =
    Sqlite3.prepare_one db
      (Printf.sprintf
	 "SELECT COUNT(child) FROM revision_ancestry, %s WHERE parent = ? AND child = id"
	 view_name_domain) in
  fun id ->
    grab_one_int stmt id

let is_interesting_neighbour_out db =
  let count_p = count_all_parents db in
  let count_c = count_regular_children db in
  let start_of_branch id =
    count_p id = 1 in
  let end_of_branch p_id =
    count_c p_id = 0 in
  fun id_parent id ->
    start_of_branch id || end_of_branch id_parent

let fetch_children db =
  let stmt =
    Sqlite3.prepare_one db
      "SELECT child FROM revision_ancestry WHERE parent = ?" in
  fun id f init ->
    Sqlite3.bind_fetch
      stmt [ `TEXT id]
      (fun acc stmt -> f acc (Sqlite3.column_text stmt 0))
      init

let collect_tags db base64 view g =
  Sqlite3.fetch_f db
    "SELECT C.id, C.value FROM revision_certs AS C, %s AS D WHERE name = 'tag' AND C.id = D.id"
    view
    (fun () stmt ->
      let id = Sqlite3.column_text stmt 0 in
      let n = NodeMap.find id g.nodes in
      let tag = blob_col base64 stmt 1 in
      n.kind <- TAGGED tag)
    ()


let ensure_node g id k =
  try NodeMap.find id g.nodes, g
  with Not_found ->
    let n = { id = id ; kind = k ; family = [] } in
    n, { g with nodes = NodeMap.add id n g.nodes }

let connect_nodes n1 n2 =
  n1.family <- (n2.id, CHILD)  :: n1.family ;
  n2.family <- (n1.id, PARENT) :: n2.family

let add_edge g id1 id2 ek =
  { g with ancestry = EdgeMap.add (id1, id2) ek g.ancestry }

let add_nodes_with_edge g id1 k1 id2 k2 ek =
  let n1, g = ensure_node g id1 k1 in
  let n2, g = ensure_node g id2 k2 in
  connect_nodes n1 n2 ;
  add_edge g id1 id2 ek


let process_regular_node g s =
  let id = Sqlite3.column_text s 0 in
  let _, g = ensure_node g id REGULAR in
  g

let process_neighb_in g s =
  let id    = Sqlite3.column_text s 0 in
  let child = Sqlite3.column_text s 1 in
  assert (NodeMap.mem child g.nodes) ;
  add_nodes_with_edge g id NEIGHBOUR_IN child REGULAR BRANCHING_NEIGH

let process_neighb_out all db =
  let is_interesting = is_interesting_neighbour_out db in
  fun g s ->
    let parent = Sqlite3.column_text s 0 in
    let id     = Sqlite3.column_text s 1 in
    assert (NodeMap.mem parent g.nodes) ;
    if all || is_interesting parent id
    then add_nodes_with_edge g parent REGULAR id NEIGHBOUR_OUT BRANCHING_NEIGH
    else g

let process_ancestry g s =
  let parent = Sqlite3.column_text s 0 in
  let child  = Sqlite3.column_text s 1 in
  assert (NodeMap.mem parent g.nodes) ;
  assert (NodeMap.mem child  g.nodes) ;
  assert (not (EdgeMap.mem (parent, child) g.ancestry)) ;
  add_nodes_with_edge g parent REGULAR child REGULAR SAME_BRANCH

let process_branching_edge g s =
  let parent = Sqlite3.column_text s 0 in
  let child  = Sqlite3.column_text s 1 in
  let e = parent, child in
  try
    if EdgeMap.find e g.ancestry = SAME_BRANCH
    then add_edge g parent child BRANCHING
    else g
  with Not_found ->
    g


let fetch_agraph_with_view db base64 query =
  let { dom = query ;
	lim = query_limit ;
	all_propagates = all_propagates } = query in
  let view_name_limit =
    if query_limit <> QUERY_NO_LIMIT
    then view_name_limit
    else view_name_domain in

  let agraph = Viz_types.empty_agraph in

  (* grab all our main nodes *)
  let agraph =
    Sqlite3.fetch_f db
      "SELECT id FROM %s" view_name_limit
      process_regular_node agraph in

  (* neighbor IN *)
  let agraph =
    Sqlite3.fetch_f db
        "SELECT parent, child \
           FROM %s AS D1 \
           JOIN revision_ancestry ON D1.id = child \
LEFT OUTER JOIN %s AS D2 ON D2.id = parent \
          WHERE D2.id ISNULL AND parent != ''"
      view_name_limit view_name_domain
      process_neighb_in agraph in

  (* neighbor OUT *)
  let agraph =
    Sqlite3.fetch_f db
        "SELECT parent, child \
           FROM %s AS D1 \
           JOIN revision_ancestry ON D1.id = parent \
LEFT OUTER JOIN %s AS D2 ON D2.id = child \
          WHERE D2.id ISNULL"
      view_name_limit view_name_domain
      (process_neighb_out all_propagates db) agraph in

  (* ancestry *)
  let agraph =
    Sqlite3.fetch_f db
      "SELECT parent, child \
         FROM %s AS D1, revision_ancestry, %s AS D2 \
        WHERE D1.id = parent AND child = D2.id"
      view_name_limit view_name_limit
      process_ancestry agraph in

  (* find merge/propagate nodes (they have more than one parent) *)
  find_merge_nodes agraph ;

  (* get tags *)
  collect_tags db base64 view_name_limit agraph ;

  (* determine the branching edges *)
  let agraph =
    begin
      match query with
      | QUERY_BRANCHES [ _ ] ->
          (* we already have the branching edges *)
	  agraph
      | _ ->
	  (* we need another database query *)
	  Sqlite3.fetch_f db
            "SELECT parent, child \
               FROM %s, revision_ancestry \
              WHERE id = child \
                AND parent != '' \
                AND NOT EXISTS \
                     (SELECT P.id FROM revision_certs AS C, revision_certs AS P \
                       WHERE C.id = child AND P.id = parent \
                             AND C.name = 'branch' AND P.name = 'branch' \
			     AND C.value = P.value)"
            view_name_limit
	    process_branching_edge agraph
    end in

  (* reconnect disconnected components *)
  let agraph =
    if query = QUERY_ALL
    then agraph
    else Components.reconnect (fetch_children db) agraph in

  agraph


let encode_quote base64 s =
  if base64
  then
    String.concat "" [ "\'" ; monot_encode s ; "\'" ]
  else
    String.concat "" [ "X\'" ; Viz_misc.hex_enc s ; "\'" ]

let fetch_with_view query base64 db f =
  let { dom = query_domain ; lim = query_limit } = query in

  let register_date_p () =
    match query_limit with
    | QUERY_BETWEEN (d1, d2) ->
	Sqlite3.create_fun_1 db "date_p"
	  (fun arg ->
	    let d = may_decode base64 (Sqlite3.value_text arg) in
	    sql_of_bool (d1 <= d && d <= d2))
    | _ -> () in

  let view_query_domain =
    match query_domain with
    | QUERY_ALL -> Printf.sprintf
	  "CREATE TEMP TABLE %s AS \
           SELECT DISTINCT id FROM revision_certs \
            WHERE name = 'branch'" view_name_domain
    | QUERY_BRANCHES q -> Printf.sprintf
	  "CREATE TEMP TABLE %s AS \
	   SELECT DISTINCT id FROM revision_certs \
            WHERE name = 'branch' AND value IN (%s)"
	  view_name_domain
	  (String.concat ", "
	     (List.map (encode_quote base64) q)) in

  let view_query_date_limit () =
    Printf.sprintf
      "CREATE TEMP TABLE %s AS \
       SELECT DISTINCT id FROM %s NATURAL JOIN revision_certs \
        WHERE name = 'date' AND date_p(value)"
      view_name_limit view_name_domain in

  Viz_misc.bracket
    ~before:(fun () ->
      (* We fetch the ids matching the query (ie those on certain branches) *)
      (* and store them in a view. *)
      Sqlite3.exec db view_query_domain ;
      Sqlite3.exec_f db
	"CREATE INDEX %s__id ON %s (id)" view_name_domain view_name_domain ;
      if query_limit <> QUERY_NO_LIMIT
      then begin
	register_date_p () ;
	Sqlite3.exec db (view_query_date_limit ()) ;
	Sqlite3.exec_f db
	  "CREATE INDEX %s__id ON %s (id)" view_name_limit view_name_limit
      end)
    ~action:(fun () -> f db base64 query)
    ~after:(fun () ->
      if query_limit <> QUERY_NO_LIMIT
      then begin
	Sqlite3.delete_function db "date_p" ;
	Sqlite3.exec_f db "DROP TABLE %s" view_name_limit
      end ;
      Sqlite3.exec_f db "DROP TABLE %s" view_name_domain)
    ()

let fetch_agraph query base64 db =
  try fetch_with_view query base64 db fetch_agraph_with_view
  with exn ->
    Printf.eprintf "fetch_agraph exn: %s\n%!"
      (Printexc.to_string exn) ;
    raise exn

let decode_and_parse_revision rostered s =
  let revision_parser =
    if rostered
    then Revision_parser.revision_set
    else Revision_parser.pre_rosters_revision_set in
  revision_parser
    Revision_lexer.lex
    (Lexing.from_string
       (Unzip.inflate_str ~kind:Unzip.GZip s))

let fetch_revision_set rostered b64 db id =
  decode_and_parse_revision
    rostered
    (List.hd
       (Sqlite3.fetch_v db
	  "SELECT data FROM revisions WHERE id = ?" [`TEXT id]
	  (acc_one_col b64) []))

let verify_cert_sig pubkeys keypair name id v signature =
  try
    let (pubkey, _) = Hashtbl.find pubkeys keypair in
    if Crypto.rsa_sha1_verify pubkey
	(Printf.sprintf "[%s@%s:%s]" name id (Base64.encode v))
	signature
    then SIG_OK
    else SIG_BAD
  with Not_found -> SIG_UNKNOWN

let process_certs pubkeys b64 acc s =
    let id      = Sqlite3.column_text s 0 in
    let name    = Sqlite3.column_text s 1 in
    let dec_v   = blob_col b64 s 2 in
    let keypair = Sqlite3.column_text s 3 in
    let dec_sig = blob_col b64 s 4 in
    { c_id = id ;
      c_name = name ;
      c_value = dec_v ;
      c_signer_id = keypair ;
      c_signature = verify_cert_sig pubkeys keypair name id dec_v dec_sig } :: acc

let fetch_certs db pubkeys b64 id =
  Sqlite3.fetch_v db
    "SELECT id, name, value, keypair, signature \
       FROM revision_certs WHERE id = ?" [`TEXT id]
    (process_certs pubkeys b64) []
let prepare_fetch_one_cert_signer db =
  Sqlite3.prepare_one db
    "SELECT keypair FROM revision_certs WHERE id = ? AND name = ?"
let prepare_fetch_one_cert_value db =
  Sqlite3.prepare_one db
    "SELECT value FROM revision_certs WHERE id = ? AND name = ?"

let fetch_one_cert_field stmt id name kind =
  Sqlite3.bind_fetch
    stmt
    [ `TEXT id ; `TEXT name ]
    (fun acc stmt ->
      let v = Sqlite3.column_text stmt 0 in
      match kind with
      | `SIGNER
      | `VALUE -> v :: acc
      | `VALUE_B64 -> monot_decode v :: acc)
    []

let get_matching_cert db b64 name p =
  List.rev
    (Sqlite3.fetch_v db
       "SELECT id, value FROM revision_certs WHERE name = ?"
       [`TEXT name]
       (fun acc s ->
	 let v  = blob_col b64 s 1 in
	 if p v
	 then begin
	   let id = Sqlite3.column_text s 0 in
	   (id, v) :: acc
	 end
	 else acc)
       [])



let spawn_monotone monotone_exe db_fname cmd input status cb =
  let cmd = monotone_exe :: "--db" :: db_fname :: cmd in
  if Viz_misc.debug "exec"
  then Printf.eprintf "### exec: Running '%s'\n%!" (String.concat " " cmd) ;
  try
    status#push "Running monotone ..." ;
    Subprocess.spawn
      ~encoding:`NONE ~cmd ~input
      ~reap_callback:status#pop
      (fun ~exceptions ~stdout ~stderr status ->
	if status = 0
	then
	  cb (`OUTPUT stdout)
	else
	  let error fmt =
	    Printf.kprintf (fun s -> cb (`SUB_PROC_ERROR s)) fmt in
	  if stderr = ""
	  then
	    error "Monotone exited with status %d:\n%s" status
	      (String.concat "\n" (List.map Printexc.to_string exceptions))
	  else
	    error "Monotone error:\n%s" stderr)
  with Gspawn.Error (_, msg) ->
    Viz_types.errorf "Could not execute monotone:\n%s" msg







type t = {
    filename  : string ;
    db        : Sqlite3.db ;
    pubkeys   : (string, Crypto.pub_rsa_key * int) Hashtbl.t ;
    stmts     : Sqlite3.stmt array ;
    rostered  : bool ;
    base64    : bool ;
    schema_id : string
  }


let sqlite_try f db =
  try f db.db
  with
  | Sqlite3.Error ((Sqlite3.LOCKED | Sqlite3.BUSY), _) as exn ->
      raise exn
  | Sqlite3.Error (_, msg) ->
      Viz_types.errorf "Error processing database %s:\n%s" db.filename msg




let open_db ?busy_handler fname =
  if not (Sys.file_exists fname)
  then Viz_types.errorf "No such file: %s" fname ;
  let db =
    try Sqlite3.open_db fname
    with Sqlite3.Error (_, msg) ->
      Viz_types.errorf "Could not open database %s:\n%s" fname msg in
  let pubkeys = Hashtbl.create 17 in
  try
    setup_sqlite ?busy_handler db ;
    let stmts = [| prepare_fetch_one_cert_signer db ;
                   prepare_fetch_one_cert_value db |] in
    let rostered = has_rosters db in
    let schema   = schema_id db in
    let base64   = uses_base64 rostered schema in
    fetch_pubkeys db base64 pubkeys ;
    { filename  = fname ;
      db        = db ;
      pubkeys   = pubkeys ;
      stmts     = stmts ;
      rostered  = rostered ;
      base64    = base64 ;
      schema_id = schema
    }
  with Sqlite3.Error (_, msg) ->
    Sqlite3.close_db db ;
    Viz_types.errorf "Error processing database %s:\n%s" fname msg

let close_db { db = db ; stmts = stmts } =
  Sqlite3.close_db db

let with_progress prg f db =
  Sqlite3.progress_handler_set db.db 2000 prg ;
  try let r = f db in Sqlite3.progress_handler_unset db.db ; r
  with exn -> Sqlite3.progress_handler_unset db.db ; raise exn

let get_filename d = d.filename

let fetch_branches db =
  sqlite_try (fetch_branches db.base64) db

let fetch_ancestry_graph db query =
  sqlite_try (fetch_agraph query db.base64) db

let fetch_revision d id =
    try
      let revision_set =
	sqlite_try (fun db ->
	  fetch_revision_set d.rostered d.base64 db id)
	  d in
      let (manifest_id, edges) = revision_set in
      { revision_id = id ;
	manifest_id = manifest_id ;
	revision_set =
	List.map
	  (fun e -> (e.Revision_types.old_revision, e.Revision_types.change_set) )
	  edges ;
	certs = [] }
    with Parsing.Parse_error ->
      Viz_types.errorf "Error while parsing revision set of %s" id

let fetch_certs_and_revision d id =
  { (fetch_revision d id)
    with certs =
      sqlite_try (fun db ->
	fetch_certs db d.pubkeys d.base64 id) d }

let fetch_cert_signer db id name =
  sqlite_try (fun _ -> fetch_one_cert_field db.stmts.(0) id name `SIGNER) db

let fetch_cert_value db id name =
  let kind = if db.base64 then `VALUE_B64 else `VALUE in
  sqlite_try (fun _ -> fetch_one_cert_field db.stmts.(1) id name kind) db

let get_key_rowid { pubkeys = pubkeys } id =
  let (_, rowid) = Hashtbl.find pubkeys id in
  rowid

let get_matching_tags db p =
  get_matching_cert db.db db.base64 "tag" p

let get_matching_dates db d_pref =
  get_matching_cert db.db db.base64 "date"
    (string_is_prefix d_pref)

let get_matching_ids db id_pref =
  get_matching_cert db.db db.base64 "branch"
    (string_is_prefix id_pref)

let run_monotone_diff db monotone_exe status cb (old_id, new_id) =
  ignore (spawn_monotone
	    monotone_exe db.filename
	    [ "--revision" ; old_id ;
	      "--revision" ; new_id ; "diff" ]
	    None status cb)


let encode_automate_stdio selectors =
  let b = Buffer.create 512 in
  List.iter
    (fun s ->
      Printf.bprintf b "l6:select" ;
      Printf.bprintf b "%d:%se\n" (String.length s) s)
    selectors ;
  let r = Buffer.contents b in
  Viz_misc.log "stdio" "stdio input: %S" r ;
  r

let decode_automate_stdio s =
  let rec loop acc cmd_buf i =
    if i >= String.length s
    then List.rev acc
    else begin
      let c1 = String.index_from s i ':' in
      let number = int_of_string (string_slice ~s:i ~e:c1 s) in
      let code   = int_of_char s.[c1 + 1] - int_of_char '0' in
      let c2 = String.index_from s (c1 + 1) ':' in
      let last   = s.[c2 + 1] in
      let c3 = String.index_from s (c2 + 1) ':' in
      let c4 = String.index_from s (c3 + 1) ':' in
      let len   = int_of_string (string_slice ~s:(c3 + 1) ~e:c4 s) in
      Buffer.add_substring cmd_buf s (c4 + 1) len ;

      match code with
      | 0 when last = 'l' ->
	  let output = Buffer.contents cmd_buf in
	  Buffer.clear cmd_buf ;
	  loop ((number, output) :: acc) cmd_buf (c4 + 1 + len)
      | _ when last = 'l' ->
	  let msg = Buffer.contents cmd_buf in
	  Viz_misc.log "stdio" "got a stdio error (code=%d): %S" code msg ;
	  failwith msg
      | _ ->
	  Buffer.add_substring cmd_buf s (c4 + 1) len ;
	  loop acc cmd_buf (c4 + 1 + len)
    end in
  loop [] (Buffer.create 1024) 0


let collect_ids stdio_output =
  Viz_misc.list_uniq
    (List.fold_left
       (fun acc (_, output) ->
	 (string_split '\n' output) @ acc)
       []
       stdio_output)

let run_monotone_select db monotone_exe status cb selectors =
  spawn_monotone
    monotone_exe db.filename [ "automate" ; "stdio" ]
    (Some (encode_automate_stdio selectors))
    status
    (function
      | `OUTPUT s ->
	  let ids =
	    try `IDS (collect_ids (decode_automate_stdio s))
	    with Failure msg -> `SUB_PROC_ERROR msg in
	  cb ids
      | `SUB_PROC_ERROR _ as r ->
	  cb r)