Below is the file 'database.ml' from this revision. You can also download the file.
open Viz_misc open Viz_types let () = Sqlite3.init let monot_encode s = Base64.encode ~linelength:72 s let monot_decode s = Base64.decode ~accept_spaces:true s let may_decode base64 v = if base64 then monot_decode v else v let blob_col base64 stmt n = may_decode base64 (Sqlite3.column_blob stmt n) let acc_one_col base64 acc stmt = blob_col base64 stmt 0 :: acc let setup_sqlite ?busy_handler db = if Viz_misc.debug "sql" then Sqlite3.trace_set db (fun s -> Printf.eprintf "### %2.3f sql: %s\n%!" (Sys.time ()) s) ; Sqlite3.exec db "PRAGMA temp_store = MEMORY" ; may (Sqlite3.busy_set db) busy_handler let schema_id db = let lines = Sqlite3.fetch db "SELECT sql FROM sqlite_master \ WHERE (type = 'table' OR type = 'index') \ AND sql IS NOT NULL \ AND name NOT LIKE 'sqlite_stat%' \ ORDER BY name" (acc_one_col false) [] in let schema_data = String.concat "\n" (List.rev lines) in let schema = Schema_lexer.massage_sql_tokens schema_data in Viz_misc.hex_enc (Crypto.sha1 schema) let has_rosters db = Sqlite3.fetch db "SELECT name FROM sqlite_master WHERE name = 'rosters'" (fun _ _ -> true) false let uses_base64 rosters schema_id = not rosters || schema_id = "1db80c7cee8fa966913db1a463ed50bf1b0e5b0e" let fetch_pubkeys db base64 tbl = Sqlite3.fetch db "SELECT id, keydata, ROWID FROM public_keys" (fun () stmt -> let id = Sqlite3.column_text stmt 0 in let data = blob_col base64 stmt 1 in let rowid = Sqlite3.column_int stmt 2 in try let key = Crypto.decode_rsa_pubkey data in Hashtbl.add tbl id (key, rowid - 1) with Failure _ -> ()) () let fetch_branches base64 db = List.sort compare (Sqlite3.fetch db "SELECT value, COUNT(id) FROM revision_certs WHERE name = 'branch' GROUP BY value" (fun acc s -> let b = blob_col base64 s 0 in let n = Sqlite3.column_int s 1 in (b, n) :: acc) []) let view_name_domain = "ids_of_branch" let view_name_limit = "ids_of_branch_with_date_limit" let sql_of_bool b = if b then `INT 1 else `INT 0 let get_relative n r = List.fold_left (fun acc -> function | (id, rel) when r = rel -> id :: acc | _ -> acc) [] n.family let count_parents n = List.length (get_relative n PARENT) let find_merge_nodes g = NodeMap.iter (fun id node -> if node.kind = REGULAR && count_parents node > 1 then node.kind <- MERGE) g.nodes let grab_one_int stmt id = Sqlite3.bind_fetch stmt [ `TEXT id ] (fun _ stmt -> Sqlite3.column_int stmt 0) 0 let count_all_parents db = let stmt = Sqlite3.prepare_one db "SELECT COUNT(parent) FROM revision_ancestry WHERE parent != '' AND child = ?" in fun id -> grab_one_int stmt id let count_regular_children db = let stmt = Sqlite3.prepare_one db (Printf.sprintf "SELECT COUNT(child) FROM revision_ancestry, %s WHERE parent = ? AND child = id" view_name_domain) in fun id -> grab_one_int stmt id let is_interesting_neighbour_out db = let count_p = count_all_parents db in let count_c = count_regular_children db in let start_of_branch id = count_p id = 1 in let end_of_branch p_id = count_c p_id = 0 in fun id_parent id -> start_of_branch id || end_of_branch id_parent let fetch_children db = let stmt = Sqlite3.prepare_one db "SELECT child FROM revision_ancestry WHERE parent = ?" in fun id f init -> Sqlite3.bind_fetch stmt [ `TEXT id] (fun acc stmt -> f acc (Sqlite3.column_text stmt 0)) init let collect_tags db base64 view g = Sqlite3.fetch_f db "SELECT C.id, C.value FROM revision_certs AS C, %s AS D WHERE name = 'tag' AND C.id = D.id" view (fun () stmt -> let id = Sqlite3.column_text stmt 0 in let n = NodeMap.find id g.nodes in let tag = blob_col base64 stmt 1 in n.kind <- TAGGED tag) () let ensure_node g id k = try NodeMap.find id g.nodes, g with Not_found -> let n = { id = id ; kind = k ; family = [] } in n, { g with nodes = NodeMap.add id n g.nodes } let connect_nodes n1 n2 = n1.family <- (n2.id, CHILD) :: n1.family ; n2.family <- (n1.id, PARENT) :: n2.family let add_edge g id1 id2 ek = { g with ancestry = EdgeMap.add (id1, id2) ek g.ancestry } let add_nodes_with_edge g id1 k1 id2 k2 ek = let n1, g = ensure_node g id1 k1 in let n2, g = ensure_node g id2 k2 in connect_nodes n1 n2 ; add_edge g id1 id2 ek let process_regular_node g s = let id = Sqlite3.column_text s 0 in let _, g = ensure_node g id REGULAR in g let process_neighb_in g s = let id = Sqlite3.column_text s 0 in let child = Sqlite3.column_text s 1 in assert (NodeMap.mem child g.nodes) ; add_nodes_with_edge g id NEIGHBOUR_IN child REGULAR BRANCHING_NEIGH let process_neighb_out all db = let is_interesting = is_interesting_neighbour_out db in fun g s -> let parent = Sqlite3.column_text s 0 in let id = Sqlite3.column_text s 1 in assert (NodeMap.mem parent g.nodes) ; if all || is_interesting parent id then add_nodes_with_edge g parent REGULAR id NEIGHBOUR_OUT BRANCHING_NEIGH else g let process_ancestry g s = let parent = Sqlite3.column_text s 0 in let child = Sqlite3.column_text s 1 in assert (NodeMap.mem parent g.nodes) ; assert (NodeMap.mem child g.nodes) ; assert (not (EdgeMap.mem (parent, child) g.ancestry)) ; add_nodes_with_edge g parent REGULAR child REGULAR SAME_BRANCH let process_branching_edge g s = let parent = Sqlite3.column_text s 0 in let child = Sqlite3.column_text s 1 in let e = parent, child in try if EdgeMap.find e g.ancestry = SAME_BRANCH then add_edge g parent child BRANCHING else g with Not_found -> g let fetch_agraph_with_view db base64 query = let { dom = query ; lim = query_limit ; all_propagates = all_propagates } = query in let view_name_limit = if query_limit <> QUERY_NO_LIMIT then view_name_limit else view_name_domain in let agraph = Viz_types.empty_agraph in (* grab all our main nodes *) let agraph = Sqlite3.fetch_f db "SELECT id FROM %s" view_name_limit process_regular_node agraph in (* neighbor IN *) let agraph = Sqlite3.fetch_f db "SELECT parent, child \ FROM %s AS D1 \ JOIN revision_ancestry ON D1.id = child \ LEFT OUTER JOIN %s AS D2 ON D2.id = parent \ WHERE D2.id ISNULL AND parent != ''" view_name_limit view_name_domain process_neighb_in agraph in (* neighbor OUT *) let agraph = Sqlite3.fetch_f db "SELECT parent, child \ FROM %s AS D1 \ JOIN revision_ancestry ON D1.id = parent \ LEFT OUTER JOIN %s AS D2 ON D2.id = child \ WHERE D2.id ISNULL" view_name_limit view_name_domain (process_neighb_out all_propagates db) agraph in (* ancestry *) let agraph = Sqlite3.fetch_f db "SELECT parent, child \ FROM %s AS D1, revision_ancestry, %s AS D2 \ WHERE D1.id = parent AND child = D2.id" view_name_limit view_name_limit process_ancestry agraph in (* find merge/propagate nodes (they have more than one parent) *) find_merge_nodes agraph ; (* get tags *) collect_tags db base64 view_name_limit agraph ; (* determine the branching edges *) let agraph = begin match query with | QUERY_BRANCHES [ _ ] -> (* we already have the branching edges *) agraph | _ -> (* we need another database query *) Sqlite3.fetch_f db "SELECT parent, child \ FROM %s, revision_ancestry \ WHERE id = child \ AND parent != '' \ AND NOT EXISTS \ (SELECT P.id FROM revision_certs AS C, revision_certs AS P \ WHERE C.id = child AND P.id = parent \ AND C.name = 'branch' AND P.name = 'branch' \ AND C.value = P.value)" view_name_limit process_branching_edge agraph end in (* reconnect disconnected components *) let agraph = if query = QUERY_ALL then agraph else Components.reconnect (fetch_children db) agraph in agraph let encode_quote base64 s = if base64 then String.concat "" [ "\'" ; monot_encode s ; "\'" ] else String.concat "" [ "X\'" ; Viz_misc.hex_enc s ; "\'" ] let fetch_with_view query base64 db f = let { dom = query_domain ; lim = query_limit } = query in let register_date_p () = match query_limit with | QUERY_BETWEEN (d1, d2) -> Sqlite3.create_fun_1 db "date_p" (fun arg -> let d = may_decode base64 (Sqlite3.value_text arg) in sql_of_bool (d1 <= d && d <= d2)) | _ -> () in let view_query_domain = match query_domain with | QUERY_ALL -> Printf.sprintf "CREATE TEMP TABLE %s AS \ SELECT DISTINCT id FROM revision_certs \ WHERE name = 'branch'" view_name_domain | QUERY_BRANCHES q -> Printf.sprintf "CREATE TEMP TABLE %s AS \ SELECT DISTINCT id FROM revision_certs \ WHERE name = 'branch' AND value IN (%s)" view_name_domain (String.concat ", " (List.map (encode_quote base64) q)) in let view_query_date_limit () = Printf.sprintf "CREATE TEMP TABLE %s AS \ SELECT DISTINCT id FROM %s NATURAL JOIN revision_certs \ WHERE name = 'date' AND date_p(value)" view_name_limit view_name_domain in Viz_misc.bracket ~before:(fun () -> (* We fetch the ids matching the query (ie those on certain branches) *) (* and store them in a view. *) Sqlite3.exec db view_query_domain ; Sqlite3.exec_f db "CREATE INDEX %s__id ON %s (id)" view_name_domain view_name_domain ; if query_limit <> QUERY_NO_LIMIT then begin register_date_p () ; Sqlite3.exec db (view_query_date_limit ()) ; Sqlite3.exec_f db "CREATE INDEX %s__id ON %s (id)" view_name_limit view_name_limit end) ~action:(fun () -> f db base64 query) ~after:(fun () -> if query_limit <> QUERY_NO_LIMIT then begin Sqlite3.delete_function db "date_p" ; Sqlite3.exec_f db "DROP TABLE %s" view_name_limit end ; Sqlite3.exec_f db "DROP TABLE %s" view_name_domain) () let fetch_agraph query base64 db = try fetch_with_view query base64 db fetch_agraph_with_view with exn -> Printf.eprintf "fetch_agraph exn: %s\n%!" (Printexc.to_string exn) ; raise exn let decode_and_parse_revision rostered s = let revision_parser = if rostered then Revision_parser.revision_set else Revision_parser.pre_rosters_revision_set in revision_parser Revision_lexer.lex (Lexing.from_string (Unzip.inflate_str ~kind:Unzip.GZip s)) let fetch_revision_set rostered b64 db id = decode_and_parse_revision rostered (List.hd (Sqlite3.fetch_v db "SELECT data FROM revisions WHERE id = ?" [`TEXT id] (acc_one_col b64) [])) let verify_cert_sig pubkeys keypair name id v signature = try let (pubkey, _) = Hashtbl.find pubkeys keypair in if Crypto.rsa_sha1_verify pubkey (Printf.sprintf "[%s@%s:%s]" name id (Base64.encode v)) signature then SIG_OK else SIG_BAD with Not_found -> SIG_UNKNOWN let process_certs pubkeys b64 acc s = let id = Sqlite3.column_text s 0 in let name = Sqlite3.column_text s 1 in let dec_v = blob_col b64 s 2 in let keypair = Sqlite3.column_text s 3 in let dec_sig = blob_col b64 s 4 in { c_id = id ; c_name = name ; c_value = dec_v ; c_signer_id = keypair ; c_signature = verify_cert_sig pubkeys keypair name id dec_v dec_sig } :: acc let fetch_certs db pubkeys b64 id = Sqlite3.fetch_v db "SELECT id, name, value, keypair, signature \ FROM revision_certs WHERE id = ?" [`TEXT id] (process_certs pubkeys b64) [] let prepare_fetch_one_cert_signer db = Sqlite3.prepare_one db "SELECT keypair FROM revision_certs WHERE id = ? AND name = ?" let prepare_fetch_one_cert_value db = Sqlite3.prepare_one db "SELECT value FROM revision_certs WHERE id = ? AND name = ?" let fetch_one_cert_field stmt id name kind = Sqlite3.bind_fetch stmt [ `TEXT id ; `TEXT name ] (fun acc stmt -> let v = Sqlite3.column_text stmt 0 in match kind with | `SIGNER | `VALUE -> v :: acc | `VALUE_B64 -> monot_decode v :: acc) [] let get_matching_cert db b64 name p = List.rev (Sqlite3.fetch_v db "SELECT id, value FROM revision_certs WHERE name = ?" [`TEXT name] (fun acc s -> let v = blob_col b64 s 1 in if p v then begin let id = Sqlite3.column_text s 0 in (id, v) :: acc end else acc) []) let spawn_monotone monotone_exe db_fname cmd input status cb = let cmd = monotone_exe :: "--db" :: db_fname :: cmd in if Viz_misc.debug "exec" then Printf.eprintf "### exec: Running '%s'\n%!" (String.concat " " cmd) ; try status#push "Running monotone ..." ; Subprocess.spawn ~encoding:`NONE ~cmd ~input ~reap_callback:status#pop (fun ~exceptions ~stdout ~stderr status -> if status = 0 then cb (`OUTPUT stdout) else let error fmt = Printf.kprintf (fun s -> cb (`SUB_PROC_ERROR s)) fmt in if stderr = "" then error "Monotone exited with status %d:\n%s" status (String.concat "\n" (List.map Printexc.to_string exceptions)) else error "Monotone error:\n%s" stderr) with Gspawn.Error (_, msg) -> Viz_types.errorf "Could not execute monotone:\n%s" msg type t = { filename : string ; db : Sqlite3.db ; pubkeys : (string, Crypto.pub_rsa_key * int) Hashtbl.t ; stmts : Sqlite3.stmt array ; rostered : bool ; base64 : bool ; schema_id : string } let sqlite_try f db = try f db.db with | Sqlite3.Error ((Sqlite3.LOCKED | Sqlite3.BUSY), _) as exn -> raise exn | Sqlite3.Error (_, msg) -> Viz_types.errorf "Error processing database %s:\n%s" db.filename msg let open_db ?busy_handler fname = if not (Sys.file_exists fname) then Viz_types.errorf "No such file: %s" fname ; let db = try Sqlite3.open_db fname with Sqlite3.Error (_, msg) -> Viz_types.errorf "Could not open database %s:\n%s" fname msg in let pubkeys = Hashtbl.create 17 in try setup_sqlite ?busy_handler db ; let stmts = [| prepare_fetch_one_cert_signer db ; prepare_fetch_one_cert_value db |] in let rostered = has_rosters db in let schema = schema_id db in let base64 = uses_base64 rostered schema in fetch_pubkeys db base64 pubkeys ; { filename = fname ; db = db ; pubkeys = pubkeys ; stmts = stmts ; rostered = rostered ; base64 = base64 ; schema_id = schema } with Sqlite3.Error (_, msg) -> Sqlite3.close_db db ; Viz_types.errorf "Error processing database %s:\n%s" fname msg let close_db { db = db ; stmts = stmts } = Sqlite3.close_db db let with_progress prg f db = Sqlite3.progress_handler_set db.db 2000 prg ; try let r = f db in Sqlite3.progress_handler_unset db.db ; r with exn -> Sqlite3.progress_handler_unset db.db ; raise exn let get_filename d = d.filename let fetch_branches db = sqlite_try (fetch_branches db.base64) db let fetch_ancestry_graph db query = sqlite_try (fetch_agraph query db.base64) db let fetch_revision d id = try let revision_set = sqlite_try (fun db -> fetch_revision_set d.rostered d.base64 db id) d in let (manifest_id, edges) = revision_set in { revision_id = id ; manifest_id = manifest_id ; revision_set = List.map (fun e -> (e.Revision_types.old_revision, e.Revision_types.change_set) ) edges ; certs = [] } with Parsing.Parse_error -> Viz_types.errorf "Error while parsing revision set of %s" id let fetch_certs_and_revision d id = { (fetch_revision d id) with certs = sqlite_try (fun db -> fetch_certs db d.pubkeys d.base64 id) d } let fetch_cert_signer db id name = sqlite_try (fun _ -> fetch_one_cert_field db.stmts.(0) id name `SIGNER) db let fetch_cert_value db id name = let kind = if db.base64 then `VALUE_B64 else `VALUE in sqlite_try (fun _ -> fetch_one_cert_field db.stmts.(1) id name kind) db let get_key_rowid { pubkeys = pubkeys } id = let (_, rowid) = Hashtbl.find pubkeys id in rowid let get_matching_tags db p = get_matching_cert db.db db.base64 "tag" p let get_matching_dates db d_pref = get_matching_cert db.db db.base64 "date" (string_is_prefix d_pref) let get_matching_ids db id_pref = get_matching_cert db.db db.base64 "branch" (string_is_prefix id_pref) let run_monotone_diff db monotone_exe status cb (old_id, new_id) = ignore (spawn_monotone monotone_exe db.filename [ "--revision" ; old_id ; "--revision" ; new_id ; "diff" ] None status cb) let encode_automate_stdio selectors = let b = Buffer.create 512 in List.iter (fun s -> Printf.bprintf b "l6:select" ; Printf.bprintf b "%d:%se\n" (String.length s) s) selectors ; let r = Buffer.contents b in Viz_misc.log "stdio" "stdio input: %S" r ; r let decode_automate_stdio s = let rec loop acc cmd_buf i = if i >= String.length s then List.rev acc else begin let c1 = String.index_from s i ':' in let number = int_of_string (string_slice ~s:i ~e:c1 s) in let code = int_of_char s.[c1 + 1] - int_of_char '0' in let c2 = String.index_from s (c1 + 1) ':' in let last = s.[c2 + 1] in let c3 = String.index_from s (c2 + 1) ':' in let c4 = String.index_from s (c3 + 1) ':' in let len = int_of_string (string_slice ~s:(c3 + 1) ~e:c4 s) in Buffer.add_substring cmd_buf s (c4 + 1) len ; match code with | 0 when last = 'l' -> let output = Buffer.contents cmd_buf in Buffer.clear cmd_buf ; loop ((number, output) :: acc) cmd_buf (c4 + 1 + len) | _ when last = 'l' -> let msg = Buffer.contents cmd_buf in Viz_misc.log "stdio" "got a stdio error (code=%d): %S" code msg ; failwith msg | _ -> Buffer.add_substring cmd_buf s (c4 + 1) len ; loop acc cmd_buf (c4 + 1 + len) end in loop [] (Buffer.create 1024) 0 let collect_ids stdio_output = Viz_misc.list_uniq (List.fold_left (fun acc (_, output) -> (string_split '\n' output) @ acc) [] stdio_output) let run_monotone_select db monotone_exe status cb selectors = spawn_monotone monotone_exe db.filename [ "automate" ; "stdio" ] (Some (encode_automate_stdio selectors)) status (function | `OUTPUT s -> let ids = try `IDS (collect_ids (decode_automate_stdio s)) with Failure msg -> `SUB_PROC_ERROR msg in cb ids | `SUB_PROC_ERROR _ as r -> cb r)