Below is the file 'database.ml' from this revision. You can also download the file.
open Viz_misc open Viz_types let () = Sqlite3.init let monot_encode s = Base64.encode ~linelength:72 s let monot_decode s = Base64.decode ~accept_spaces:true s let sql_escape s = String.concat "''" (string_split ~collapse:false '\'' s) let may_decode base64 v = if base64 then monot_decode v else v let acc_one_col base64 acc row = may_decode base64 row.(0) :: acc let setup_sqlite ?busy_handler db = if Viz_misc.debug "sql" then Sqlite3.trace_set db (fun s -> prerr_string "### sql: " ; prerr_endline s) ; Sqlite3.exec db "PRAGMA temp_store = MEMORY" ; may (Sqlite3.busy_set db) busy_handler let schema_id db = let lines = Sqlite3.fetch db "SELECT sql FROM sqlite_master \ WHERE (type = 'table' OR type = 'index') \ AND sql IS NOT NULL \ AND name NOT LIKE 'sqlite_stat%' \ ORDER BY name" (acc_one_col false) [] in let schema_data = String.concat "\n" (List.rev lines) in let schema = Schema_lexer.massage_sql_tokens schema_data in Viz_misc.hex_enc (Crypto.sha1 schema) let has_rosters db = Sqlite3.fetch db "SELECT name FROM sqlite_master WHERE name = 'rosters'" (fun _ _ -> true) false let uses_base64 rosters schema_id = not rosters || schema_id = "1db80c7cee8fa966913db1a463ed50bf1b0e5b0e" let fetch_pubkeys db base64 tbl = Sqlite3.fetch db "SELECT id, keydata, ROWID FROM public_keys" (fun () -> function | [| id; data; rowid |] -> begin try let data = may_decode base64 data in let key = Crypto.decode_rsa_pubkey data in Hashtbl.add tbl id (key, int_of_string rowid - 1) with Failure _ -> () end | _ -> ()) () let fetch_branches base64 db = List.sort compare (Sqlite3.fetch db "SELECT DISTINCT value FROM revision_certs WHERE name = 'branch'" (acc_one_col base64) []) let view_name_domain = "ids_of_branch" let view_name_limit = "ids_of_branch_with_date_limit" let sql_of_bool b = if b then `INT 1 else `INT 0 let id_set_add_if t v s = if t && v <> "" then IdSet.add v s else s let add_node kind id rel_id rel nodes = try let current_node = NodeMap.find id nodes in if List.mem_assoc rel_id current_node.family then nodes else NodeMap.add id { current_node with family = (rel_id, rel) :: current_node.family } nodes with Not_found -> NodeMap.add id { id = id ; kind = kind ; family = [ rel_id, rel ] } nodes let process_ancestry_row g parent parent_kind child child_kind = assert (parent_kind = REGULAR || child_kind = REGULAR) ; assert (parent <> "" && child <> "") ; { nodes = (add_node parent_kind parent child CHILD ( add_node child_kind child parent PARENT g.nodes)) ; ancestry = EdgeMap.add (parent, child) (if parent_kind = REGULAR && child_kind = REGULAR then SAME_BRANCH else BRANCHING) g.ancestry ; neighbour_nodes = (id_set_add_if (parent_kind <> REGULAR) parent ( id_set_add_if (child_kind <> REGULAR) child g.neighbour_nodes)) } let process_ancestry_row_simple g = function | [| "" ; child |] -> if not (NodeMap.mem child g.nodes) then begin let new_node = { id = child ; kind = REGULAR ; family = [] } in let nodes = NodeMap.add child new_node g.nodes in { g with nodes = nodes } end else g | [| parent ; child |] -> process_ancestry_row g parent REGULAR child REGULAR | _ -> g let process_ancestry_row_neigh_out g = function | [| parent ; child ; "0" |] -> process_ancestry_row g parent REGULAR child NEIGHBOUR_OUT | [| parent ; child ; _ |] -> { g with ancestry = EdgeMap.add (parent, child) SAME_BRANCH g.ancestry } | _ -> g let process_ancestry_row_neigh_in g = function | [| parent ; child ; "0" |] -> process_ancestry_row g parent NEIGHBOUR_IN child REGULAR | [| parent ; child ; _ |] -> { g with ancestry = EdgeMap.add (parent, child) SAME_BRANCH g.ancestry } | _ -> g let number_of_parent node = List.fold_left (fun n -> function | (_, PARENT) -> n + 1 | _ -> n) 0 node.family let find_merge_nodes nodes = NodeMap.fold (fun id node m -> if number_of_parent node > 1 then NodeMap.add id { node with kind = MERGE } m else m) nodes nodes let process_branching_edge_row g = function | [| parent; child |] -> { g with ancestry = EdgeMap.add (parent, child) BRANCHING g.ancestry } | _ -> g let fetch_children db = let stmt = lazy (Sqlite3.prepare_one db "SELECT child FROM revision_ancestry WHERE parent = ?") in fun id f init -> let stmt = Lazy.force stmt in Sqlite3.reset stmt ; Sqlite3.bind stmt 1 (`TEXT id) ; Sqlite3.fold_rows (fun acc stmt -> f acc (Sqlite3.column_text stmt 0)) init stmt let collect_tags db base64 nodes = let stmt = Sqlite3.prepare_one db "SELECT value FROM revision_certs WHERE id = ? AND name = 'tag'" in NodeMap.fold (fun id node m -> Sqlite3.reset stmt ; Sqlite3.bind stmt 1 (`TEXT id) ; Sqlite3.fold_rows (fun m stmt -> let r = Sqlite3.column_blob stmt 0 in let v = may_decode base64 r in NodeMap.add id { node with kind = TAGGED v } m) m stmt) nodes nodes let fetch_agraph_with_view db base64 (query, query_limit) = let view_name_limit = if query_limit <> QUERY_NO_LIMIT then view_name_limit else view_name_domain in let agraph = Viz_types.empty_agraph in (* grab all node ids and edges we're interested in *) let agraph = Sqlite3.fetch_f db process_ancestry_row_simple agraph "SELECT parent, child FROM revision_ancestry, %s \ WHERE (parent = '' OR parent = id) AND child IN %s" view_name_limit view_name_limit in (* also grab neighbor nodes *) let agraph = Sqlite3.fetch_f db process_ancestry_row_neigh_out agraph "SELECT parent, child, child IN %s FROM revision_ancestry \ WHERE parent IN %s AND child NOT IN %s" view_name_domain view_name_limit view_name_limit in let agraph = Sqlite3.fetch_f db process_ancestry_row_neigh_in agraph "SELECT parent, child, parent IN %s FROM revision_ancestry \ WHERE child IN %s AND parent != '' AND parent NOT IN %s" view_name_domain view_name_limit view_name_limit in (* find merge/propagate nodes (they have more than one parent) *) let agraph = { agraph with nodes = find_merge_nodes agraph.nodes } in (* get tags *) let agraph = { agraph with nodes = collect_tags db base64 agraph.nodes } in (* determine the branching edges *) let agraph = begin match query with | QUERY_BRANCHES [ _ ] -> (* we already have the branching edges *) agraph | _ -> (* we need another database query *) Sqlite3.fetch_f db process_branching_edge_row agraph "SELECT parent, child \ FROM revision_ancestry AS A \ WHERE A.child IN %s AND A.parent != '' AND \ NOT EXISTS \ (SELECT P.id FROM revision_certs AS C, revision_certs AS P \ WHERE C.id = A.child AND P.id = A.parent \ AND C.name = 'branch' AND P.name = 'branch' \ AND C.value = P.value)" view_name_limit end in (* reconnect disconnected components *) let agraph = if query = QUERY_ALL then agraph else Components.reconnect (fetch_children db) agraph in agraph let encode_quote base64 s = if base64 then String.concat "" [ "\'" ; monot_encode s ; "\'" ] else String.concat "" [ "X\'" ; Viz_misc.hex_enc s ; "\'" ] let fetch_with_view query base64 db f = let (query_domain, query_limit) = query in let register_date_p () = match query_limit with | QUERY_BETWEEN (d1, d2) -> Sqlite3.create_fun_1 db "date_p" (fun arg -> let d = if base64 then monot_decode (Sqlite3.value_text arg) else Sqlite3.value_blob arg in sql_of_bool (d1 <= d && d <= d2)) | _ -> () in let view_query_domain = match query_domain with | QUERY_ALL -> Printf.sprintf "CREATE TEMP TABLE %s AS \ SELECT DISTINCT id FROM revision_certs \ WHERE name = 'branch'" view_name_domain | QUERY_BRANCHES q -> Printf.sprintf "CREATE TEMP TABLE %s AS \ SELECT DISTINCT id FROM revision_certs \ WHERE name = 'branch' AND value IN (%s)" view_name_domain (String.concat ", " (List.map (encode_quote base64) q)) in let view_query_date_limit () = Printf.sprintf "CREATE TEMP TABLE %s AS \ SELECT DISTINCT id FROM revision_certs \ WHERE name = 'date' AND id IN %s AND date_p(value)" view_name_limit view_name_domain in Viz_misc.bracket ~before:(fun () -> (* We fetch the ids matching the query (ie those on certain branches) *) (* and store them in a view. *) Sqlite3.exec db view_query_domain ; Sqlite3.exec_f db "CREATE INDEX %s__id ON %s (id)" view_name_domain view_name_domain ; if query_limit <> QUERY_NO_LIMIT then begin register_date_p () ; Sqlite3.exec db (view_query_date_limit ()) ; Sqlite3.exec_f db "CREATE INDEX %s__id ON %s (id)" view_name_limit view_name_limit end) ~action:(fun () -> f db base64 query) ~after:(fun () -> if query_limit <> QUERY_NO_LIMIT then begin Sqlite3.delete_function db "date_p" ; Sqlite3.exec_f db "DROP TABLE %s" view_name_limit end ; Sqlite3.exec_f db "DROP TABLE %s" view_name_domain) () let fetch_agraph query base64 db = try fetch_with_view query base64 db fetch_agraph_with_view with exn -> Printf.eprintf "fetch_agraph exn: %s\n%!" (Printexc.to_string exn) ; raise exn let decode_and_parse_revision rostered s = let revision_parser = if rostered then Revision_parser.revision_set else Revision_parser.pre_rosters_revision_set in revision_parser Revision_lexer.lex (Lexing.from_string (Unzip.inflate_str ~kind:Unzip.GZip s)) let fetch_revision_set rostered b64 db id = decode_and_parse_revision rostered (List.hd (Sqlite3.fetch_f db (acc_one_col b64) [] "SELECT data FROM revisions WHERE id = '%s'" id)) let verify_cert_sig pubkeys keypair name id v signature = try let (pubkey, _) = Hashtbl.find pubkeys keypair in if Crypto.rsa_sha1_verify pubkey (Printf.sprintf "[%s@%s:%s]" name id (Base64.encode v)) signature then SIG_OK else SIG_BAD with Not_found -> SIG_UNKNOWN let process_certs pubkeys b64 acc = function | [| id; name; v; keypair; signature |] -> let dec_v = may_decode b64 v in let dec_sig = may_decode b64 signature in { c_id = id ; c_name = name ; c_value = dec_v ; c_signer_id = keypair ; c_signature = verify_cert_sig pubkeys keypair name id dec_v dec_sig } :: acc | _ -> acc let fetch_certs db pubkeys b64 id = Sqlite3.fetch_f db (process_certs pubkeys b64) [] "SELECT id, name, value, keypair, signature \ FROM revision_certs WHERE id = '%s'" id let prepare_fetch_one_cert_signer db = Sqlite3.prepare_one db "SELECT keypair FROM revision_certs WHERE id = ? AND name = ?" let prepare_fetch_one_cert_value db = Sqlite3.prepare_one db "SELECT value FROM revision_certs WHERE id = ? AND name = ?" let fetch_one_cert_field stmt id name kind = Sqlite3.reset stmt ; Sqlite3.bind stmt 1 (`TEXT id) ; Sqlite3.bind stmt 2 (`TEXT name) ; Sqlite3.fold_rows (fun acc stmt -> let v = Sqlite3.column_text stmt 0 in match kind with | `SIGNER | `VALUE -> v :: acc | `VALUE_B64 -> monot_decode v :: acc) [] stmt let get_matching_cert db b64 name p = List.rev (Sqlite3.fetch_f db (fun acc -> function | [| id; v |] -> let dv = may_decode b64 v in if p dv then (id, dv) :: acc else acc | _ -> acc) [] "SELECT id, value FROM revision_certs WHERE name = '%s'" name) let spawn_monotone monotone_exe db_fname cmd input status cb = let cmd = monotone_exe :: "--db" :: db_fname :: cmd in if Viz_misc.debug "exec" then Printf.eprintf "### exec: Running '%s'\n%!" (String.concat " " cmd) ; try status#push "Running monotone ..." ; Subprocess.spawn ~encoding:`NONE ~cmd ~input ~reap_callback:status#pop (fun ~exceptions ~stdout ~stderr status -> if status = 0 then cb (`OUTPUT stdout) else let error fmt = Printf.kprintf (fun s -> cb (`SUB_PROC_ERROR s)) fmt in if stderr = "" then error "Monotone exited with status %d:\n%s" status (String.concat "\n" (List.map Printexc.to_string exceptions)) else error "Monotone error:\n%s" stderr) with Gspawn.Error (_, msg) -> Viz_types.errorf "Could not execute monotone:\n%s" msg type t = { filename : string ; db : Sqlite3.db ; pubkeys : (string, Crypto.pub_rsa_key * int) Hashtbl.t ; stmts : Sqlite3.stmt array ; rostered : bool ; base64 : bool ; schema_id : string } let sqlite_try f db = try f db.db with | Sqlite3.Error (Sqlite3.LOCKED, _) as exn -> raise exn | Sqlite3.Error (_, msg) -> Viz_types.errorf "Error processing database %s:\n%s" db.filename msg let open_db ?busy_handler fname = if not (Sys.file_exists fname) then Viz_types.errorf "No such file: %s" fname ; let db = try Sqlite3.open_db fname with Sqlite3.Error (_, msg) -> Viz_types.errorf "Could not open database %s:\n%s" fname msg in let pubkeys = Hashtbl.create 17 in try setup_sqlite ?busy_handler db ; let stmts = [| prepare_fetch_one_cert_signer db ; prepare_fetch_one_cert_value db |] in let rostered = has_rosters db in let schema = schema_id db in let base64 = uses_base64 rostered schema in fetch_pubkeys db base64 pubkeys ; { filename = fname ; db = db ; pubkeys = pubkeys ; stmts = stmts ; rostered = rostered ; base64 = base64 ; schema_id = schema } with Sqlite3.Error (_, msg) -> Sqlite3.close_db db ; Viz_types.errorf "Error processing database %s:\n%s" fname msg let close_db { db = db ; stmts = stmts } = Sqlite3.close_db db let with_progress prg f db = Sqlite3.progress_handler_set db.db 2000 prg ; try let r = f db in Sqlite3.progress_handler_unset db.db ; r with exn -> Sqlite3.progress_handler_unset db.db ; raise exn let get_filename d = d.filename let fetch_branches db = sqlite_try (fetch_branches db.base64) db let fetch_ancestry_graph db query = sqlite_try (fetch_agraph query db.base64) db let fetch_revision d id = try let revision_set = sqlite_try (fun db -> fetch_revision_set d.rostered d.base64 db id) d in let (manifest_id, edges) = revision_set in { revision_id = id ; manifest_id = manifest_id ; revision_set = List.map (fun e -> (e.Revision_types.old_revision, e.Revision_types.change_set) ) edges ; certs = [] } with Parsing.Parse_error -> Viz_types.errorf "Error while parsing revision set of %s" id let fetch_certs_and_revision d id = { (fetch_revision d id) with certs = sqlite_try (fun db -> fetch_certs db d.pubkeys d.base64 id) d } let fetch_cert_signer db id name = sqlite_try (fun _ -> fetch_one_cert_field db.stmts.(0) id name `SIGNER) db let fetch_cert_value db id name = let kind = if db.base64 then `VALUE_B64 else `VALUE in sqlite_try (fun _ -> fetch_one_cert_field db.stmts.(1) id name kind) db let get_key_rowid { pubkeys = pubkeys } id = let (_, rowid) = Hashtbl.find pubkeys id in rowid let get_matching_tags db p = get_matching_cert db.db db.base64 "tag" p let get_matching_dates db d_pref = get_matching_cert db.db db.base64 "date" (string_is_prefix d_pref) let get_matching_ids db id_pref = get_matching_cert db.db db.base64 "branch" (string_is_prefix id_pref) let run_monotone_diff db monotone_exe status cb (old_id, new_id) = ignore (spawn_monotone monotone_exe db.filename [ "--revision" ; old_id ; "--revision" ; new_id ; "diff" ] None status cb) let encode_automate_stdio selectors = let b = Buffer.create 512 in List.iter (fun s -> Printf.bprintf b "l6:select" ; Printf.bprintf b "%d:%se\n" (String.length s) s) selectors ; let r = Buffer.contents b in Viz_misc.log "stdio" "stdio input: %S" r ; r let decode_automate_stdio s = let rec loop acc cmd_buf i = if i >= String.length s then List.rev acc else begin let c1 = String.index_from s i ':' in let number = int_of_string (string_slice ~s:i ~e:c1 s) in let code = int_of_char s.[c1 + 1] - int_of_char '0' in let c2 = String.index_from s (c1 + 1) ':' in let last = s.[c2 + 1] in let c3 = String.index_from s (c2 + 1) ':' in let c4 = String.index_from s (c3 + 1) ':' in let len = int_of_string (string_slice ~s:(c3 + 1) ~e:c4 s) in Buffer.add_substring cmd_buf s (c4 + 1) len ; match code with | 0 when last = 'l' -> let output = Buffer.contents cmd_buf in Buffer.clear cmd_buf ; loop ((number, output) :: acc) cmd_buf (c4 + 1 + len) | _ when last = 'l' -> let msg = Buffer.contents cmd_buf in Viz_misc.log "stdio" "got a stdio error (code=%d): %S" code msg ; failwith msg | _ -> Buffer.add_substring cmd_buf s (c4 + 1) len ; loop acc cmd_buf (c4 + 1 + len) end in loop [] (Buffer.create 1024) 0 let collect_ids stdio_output = Viz_misc.list_uniq (List.fold_left (fun acc (_, output) -> (string_split '\n' output) @ acc) [] stdio_output) let run_monotone_select db monotone_exe status cb selectors = spawn_monotone monotone_exe db.filename [ "automate" ; "stdio" ] (Some (encode_automate_stdio selectors)) status (function | `OUTPUT s -> let ids = try `IDS (collect_ids (decode_automate_stdio s)) with Failure msg -> `SUB_PROC_ERROR msg in cb ids | `SUB_PROC_ERROR _ as r -> cb r)