parent
0309737ef7
commit
5298108d1a
@ -0,0 +1,196 @@ |
||||
open Rresult.R.Infix |
||||
|
||||
let save_data outputdir (filepath, data) = |
||||
let localpath = Fpath.(outputdir // filepath) in |
||||
(* FIXME: return an error?! *) |
||||
let () = |
||||
match Bos.OS.File.exists localpath with |
||||
| Ok false -> |
||||
Logs.warn (fun m -> m "artifact file %a does not exist in %a" |
||||
Fpath.pp filepath Fpath.pp outputdir) |
||||
| Error (`Msg e) -> |
||||
Logs.warn (fun m -> m "artifact file error %a: %s" |
||||
Fpath.pp localpath e) |
||||
| Ok true -> () |
||||
in |
||||
(filepath, localpath, data) |
||||
|
||||
let get_by_uuid uuid (module Db : Caqti_blocking.CONNECTION) = |
||||
Uuidm.of_string uuid |> Option.to_result ~none:"bad uuid" >>= fun uuid -> |
||||
Db.find_opt Builder_db.Build.get_by_uuid uuid |
||||
|> Result.map_error (fun e -> |
||||
Fmt.strf "Error getting build %a: %a" Uuidm.pp uuid Caqti_error.pp e) |
||||
|
||||
let db_add_build (job, uuid, console, start, finish, result, artifacts) |
||||
(input_files : (Fpath.t * Fpath.t * string) list) |
||||
(module Db : Caqti_blocking.CONNECTION) = |
||||
let open Builder_db in |
||||
let job_name = job.Builder.name in |
||||
Db.exec Job.try_add job_name >>= fun () -> |
||||
Db.find Job.get_id_by_name job_name >>= fun job_id -> |
||||
Db.exec Build.add { Build.uuid; start; finish; result; console; |
||||
script = job.Builder.script; job_id } >>= fun () -> |
||||
Db.find last_insert_rowid () >>= fun id -> |
||||
List.fold_left |
||||
(fun r (filepath, localpath, data) -> |
||||
r >>= fun () -> |
||||
let sha256 = Mirage_crypto.Hash.SHA256.digest (Cstruct.of_string data) in |
||||
Db.exec Build_artifact.add ({ filepath; localpath; sha256 }, id)) |
||||
(Ok ()) |
||||
artifacts >>= fun () -> |
||||
List.fold_left |
||||
(fun r (filepath, localpath, data) -> |
||||
r >>= fun () -> |
||||
let sha256 = Mirage_crypto.Hash.SHA256.digest (Cstruct.of_string data) in |
||||
Db.exec Build_file.add ({ filepath; localpath; sha256 }, id)) |
||||
(Ok ()) |
||||
input_files |
||||
|
||||
let add_build conn builddir = |
||||
let f = Fpath.(builddir / "full") in |
||||
let outputdir = Fpath.(builddir / "output") in |
||||
let inputdir = Fpath.(builddir / "input") in |
||||
let uuid = Fpath.basename builddir in |
||||
match get_by_uuid uuid conn with |
||||
| Error e -> Logs.warn (fun m -> m "%s" e) |
||||
| Ok (Some _) -> Logs.debug (fun m -> m "Skipping %a, already in database" Fpath.pp builddir) |
||||
| Ok None -> |
||||
Logs.debug (fun m -> m "Adding build %a" Fpath.pp builddir); |
||||
match Bos.OS.File.read f with |
||||
| Error (`Msg e) -> |
||||
Logs.warn (fun m -> m "Error getting build %a: %s" |
||||
Fpath.pp builddir e) |
||||
| Ok contents -> |
||||
match Builder.Asn.exec_of_cs (Cstruct.of_string contents) with |
||||
| Error (`Msg e) -> |
||||
Logs.warn (fun m -> m "Error parsing build file %a: %s" |
||||
Fpath.pp f e) |
||||
| Ok (job, uuid, console, start, finish, result, data) -> |
||||
let data = List.map (save_data outputdir) data in |
||||
let input_files = List.map (save_data inputdir) job.Builder.files in |
||||
match db_add_build (job, uuid, console, start, finish, result, data) input_files conn with |
||||
| Error e -> |
||||
Logs.err (fun m -> m "Error inserting build %a: %a" |
||||
Fpath.pp builddir Caqti_error.pp e) |
||||
| Ok () -> () |
||||
|
||||
let add_job conn jobdir = |
||||
Logs.debug (fun m -> m "Adding job %a" Fpath.pp jobdir); |
||||
match Bos.OS.Dir.contents jobdir with |
||||
| Error (`Msg e) -> |
||||
Logs.warn (fun m -> |
||||
m "Error getting job %s: %s\n" (Fpath.basename jobdir) e) |
||||
| Ok builds -> |
||||
List.iter (add_build conn) builds |
||||
|
||||
let add_jobs conn datadir = |
||||
Bos.OS.Dir.contents datadir >>| |
||||
List.filter (fun f -> not Fpath.(equal (v "state") f)) >>| |
||||
List.iter (add_job conn) |
||||
|
||||
let add () dbpath datadir = |
||||
let datadir = Fpath.v datadir in |
||||
Logs.debug (fun m -> m "Data dir: %a" Fpath.pp datadir); |
||||
let conn = |
||||
match Caqti_blocking.connect (Uri.make ~scheme:"sqlite3" ~path:dbpath ~query:["create", ["false"]] ()) with |
||||
| Error e -> |
||||
Logs.err (fun m -> m "Error connecting to database: %a" Caqti_error.pp e); |
||||
exit 1 |
||||
| Ok conn -> |
||||
conn |
||||
in |
||||
match add_jobs conn datadir with |
||||
| Ok () -> () |
||||
| Error (`Msg e) -> |
||||
Logs.err (fun m -> m "Error getting jobs: %s\n" e); |
||||
exit 2 |
||||
|
||||
let do_migrate dbpath = |
||||
Caqti_blocking.connect (Uri.make ~scheme:"sqlite3" ~path:dbpath ()) |
||||
>>= fun (module Db : Caqti_blocking.CONNECTION) -> |
||||
List.fold_left |
||||
(fun r migrate -> |
||||
r >>= fun () -> |
||||
Logs.debug (fun m -> m "Executing migration query: %a" Caqti_request.pp migrate); |
||||
Db.exec migrate ()) |
||||
(Ok ()) |
||||
Builder_db.migrate |
||||
|
||||
let migrate () dbpath = |
||||
match do_migrate dbpath with |
||||
| Ok () -> () |
||||
| Error e -> |
||||
Format.eprintf "Database error: %a" Caqti_error.pp e; |
||||
exit 1 |
||||
|
||||
let help man_format cmds = function |
||||
| None -> `Help (man_format, None) |
||||
| Some cmd -> |
||||
if List.mem cmd cmds |
||||
then `Help (man_format, Some cmd) |
||||
else `Error (true, "Unknown command: " ^ cmd) |
||||
|
||||
let dbpath = |
||||
let doc = "sqlite3 database path" in |
||||
Cmdliner.Arg.(value & |
||||
opt non_dir_file "builder.sqlite3" & |
||||
info ~doc ["dbpath"]) |
||||
|
||||
let dbpath_new = |
||||
let doc = "sqlite3 database path" in |
||||
Cmdliner.Arg.(value & |
||||
opt string "builder.sqlite3" & |
||||
info ~doc ["dbpath"]) |
||||
|
||||
let datadir = |
||||
let doc = Cmdliner.Arg.info ~doc:"builder data dir" ["datadir"] in |
||||
Cmdliner.Arg.(value & |
||||
opt dir "/var/db/builder/" doc) |
||||
|
||||
let setup_log = |
||||
let setup_log level = |
||||
Logs.set_level level; |
||||
Logs.set_reporter (Logs_fmt.reporter ~dst:Format.std_formatter ()); |
||||
Logs.debug (fun m -> m "Set log level %s" (Logs.level_to_string level)) |
||||
in |
||||
Cmdliner.Term.(const setup_log $ Logs_cli.level ()) |
||||
|
||||
let migrate_cmd = |
||||
let doc = "create database and add tables" in |
||||
Cmdliner.Term.(pure migrate $ setup_log $ dbpath_new), |
||||
Cmdliner.Term.info ~doc "migrate" |
||||
|
||||
let add_cmd = |
||||
let doc = "populates database with builder data" in |
||||
let man = |
||||
[ `S "DESCRIPTION"; |
||||
`P "Scrape builder data directory information and insert into builder-web database."; |
||||
`P "It assumes the `full' files are stored in a directory hierarchy of the following shape:"; |
||||
`Pre "/path/to/datadir/JOB-NAME/BUILD-UUID/full"; |
||||
`P "Before parsing, The UUID in the filesystem is looked up in the database \ |
||||
to see if already exists.\ |
||||
It is assumed the UUIDs correspond."; |
||||
] |
||||
in |
||||
(Cmdliner.Term.(pure add $ setup_log $ dbpath $ datadir), |
||||
Cmdliner.Term.info ~doc ~man "add") |
||||
|
||||
let help_cmd = |
||||
let topic = |
||||
let doc = "Command to get help on" in |
||||
Cmdliner.Arg.(value & pos 0 (some string) None & info ~doc ~docv:"COMMAND" []) |
||||
in |
||||
let doc = "Builder database help" in |
||||
Cmdliner.Term.(ret (const help $ man_format $ choice_names $ topic)), |
||||
Cmdliner.Term.info ~doc "help" |
||||
|
||||
let default_cmd = |
||||
let doc = "Builder database command" in |
||||
Cmdliner.Term.(ret (const help $ man_format $ choice_names $ const None)), |
||||
Cmdliner.Term.info ~doc "builder-db" |
||||
|
||||
let () = |
||||
Cmdliner.Term.eval_choice |
||||
default_cmd |
||||
[help_cmd; add_cmd; migrate_cmd] |
||||
|> Cmdliner.Term.exit |
@ -1,4 +1,11 @@ |
||||
(executable |
||||
(public_name builder_web) |
||||
(name builder_web_app) |
||||
(modules builder_web_app) |
||||
(libraries builder_web)) |
||||
|
||||
(executable |
||||
(public_name builder-db) |
||||
(name builder_db) |
||||
(modules builder_db) |
||||
(libraries builder_db caqti.blocking uri bos fmt logs logs.cli logs.fmt cmdliner)) |
||||
|
@ -0,0 +1,342 @@ |
||||
module Rep = Representation |
||||
open Rep |
||||
|
||||
type id = Rep.id |
||||
|
||||
type file = { |
||||
filepath : Fpath.t; |
||||
localpath : Fpath.t; |
||||
sha256 : Cstruct.t; |
||||
} |
||||
|
||||
let file = |
||||
let encode { filepath; localpath; sha256 } = |
||||
Ok (filepath, localpath, sha256) in |
||||
let decode (filepath, localpath, sha256) = |
||||
Ok { filepath; localpath; sha256 } in |
||||
Caqti_type.custom ~encode ~decode Caqti_type.(tup3 fpath fpath cstruct) |
||||
|
||||
let last_insert_rowid = |
||||
Caqti_request.find |
||||
Caqti_type.unit |
||||
id |
||||
"SELECT last_insert_rowid()" |
||||
|
||||
module Job = struct |
||||
let migrate = |
||||
Caqti_request.exec |
||||
Caqti_type.unit |
||||
{| CREATE TABLE job ( |
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, |
||||
name VARCHAR(255) NOT NULL UNIQUE |
||||
) |
||||
|} |
||||
|
||||
let rollback = |
||||
Caqti_request.exec |
||||
Caqti_type.unit |
||||
{| DROP TABLE IF EXISTS job |} |
||||
|
||||
let get = |
||||
Caqti_request.find_opt |
||||
id |
||||
Caqti_type.string |
||||
"SELECT name FROM job WHERE id = ?" |
||||
|
||||
let get_id_by_name = |
||||
Caqti_request.find |
||||
Caqti_type.string |
||||
id |
||||
"SELECT id FROM job WHERE name = ?" |
||||
|
||||
let get_all = |
||||
Caqti_request.collect |
||||
Caqti_type.unit |
||||
Caqti_type.(tup2 id string) |
||||
"SELECT id, name FROM job ORDER BY name ASC" |
||||
|
||||
let try_add = |
||||
Caqti_request.exec |
||||
Caqti_type.string |
||||
"INSERT OR IGNORE INTO job (name) VALUES (?)" |
||||
|
||||
let remove = |
||||
Caqti_request.exec |
||||
id |
||||
"DELETE FROM job WHERE id = ?" |
||||
end |
||||
|
||||
module Build_artifact = struct |
||||
let migrate = |
||||
Caqti_request.exec |
||||
Caqti_type.unit |
||||
{| CREATE TABLE build_artifact ( |
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, |
||||
filepath TEXT NOT NULL, -- the path as in the build |
||||
localpath TEXT NOT NULL, -- local path to the file on disk |
||||
sha256 BLOB NOT NULL, |
||||
build INTEGER NOT NULL, |
||||
|
||||
FOREIGN KEY(build) REFERENCES build(id), |
||||
UNIQUE(build, filepath) |
||||
) |
||||
|} |
||||
|
||||
let rollback = |
||||
Caqti_request.exec |
||||
Caqti_type.unit |
||||
"DROP TABLE IF EXISTS build_artifact" |
||||
|
||||
let get_by_build_uuid = |
||||
Caqti_request.find_opt |
||||
(Caqti_type.tup2 uuid fpath) |
||||
(Caqti_type.tup2 fpath cstruct) |
||||
{| SELECT build_artifact.localpath, build_artifact.sha256 |
||||
FROM build_artifact |
||||
INNER JOIN build ON build.id = build_artifact.build |
||||
WHERE build.uuid = ? AND build_artifact.filepath = ? |
||||
|} |
||||
|
||||
let get_all_by_build = |
||||
Caqti_request.collect |
||||
id |
||||
Caqti_type.(tup2 |
||||
id |
||||
file) |
||||
"SELECT id, filepath, localpath, sha256 FROM build_artifact WHERE build = ?" |
||||
|
||||
let add = |
||||
Caqti_request.exec |
||||
Caqti_type.(tup2 file id) |
||||
"INSERT INTO build_artifact (filepath, localpath, sha256, build) |
||||
VALUES (?, ?, ?, ?)" |
||||
|
||||
let remove_by_build = |
||||
Caqti_request.exec |
||||
id |
||||
"DELETE FROM build_artifact WHERE build = ?" |
||||
end |
||||
|
||||
module Build_file = struct |
||||
let migrate = |
||||
Caqti_request.exec |
||||
Caqti_type.unit |
||||
{| CREATE TABLE build_file ( |
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, |
||||
filepath TEXT NOT NULL, -- the path as in the build |
||||
localpath TEXT NOT NULL, -- local path to the file on disk |
||||
sha256 BLOB NOT NULL, |
||||
build INTEGER NOT NULL, |
||||
|
||||
FOREIGN KEY(build) REFERENCES build(id), |
||||
UNIQUE(build, filepath) |
||||
) |
||||
|} |
||||
|
||||
let rollback = |
||||
Caqti_request.exec |
||||
Caqti_type.unit |
||||
"DROP TABLE IF EXISTS build_file" |
||||
|
||||
let get_by_build_uuid = |
||||
Caqti_request.find_opt |
||||
(Caqti_type.tup2 uuid fpath) |
||||
(Caqti_type.tup2 fpath cstruct) |
||||
{| SELECT build_file.localpath, build_file.sha256 |
||||
FROM build_file |
||||
INNER JOIN build ON build.id = build_file.build |
||||
WHERE build.uuid = ? AND build_file.filepath = ? |
||||
|} |
||||
|
||||
let get_all_by_build = |
||||
Caqti_request.collect |
||||
id |
||||
Caqti_type.(tup2 |
||||
id |
||||
file) |
||||
"SELECT id, filepath, localpath, sha256 FROM build_file WHERE build = ?" |
||||
|
||||
let add = |
||||
Caqti_request.exec |
||||
Caqti_type.(tup2 file id) |
||||
"INSERT INTO build_file (filepath, localpath, sha256, build) |
||||
VALUES (?, ?, ?, ?)" |
||||
|
||||
let remove_by_build = |
||||
Caqti_request.exec |
||||
id |
||||
"DELETE FROM build_file WHERE build = ?" |
||||
end |
||||
|
||||
module Build = struct |
||||
type t = { |
||||
uuid : Uuidm.t; |
||||
start : Ptime.t; |
||||
finish : Ptime.t; |
||||
result : Builder.execution_result; |
||||
console : (int * string) list; |
||||
script : string; |
||||
job_id : id; |
||||
} |
||||
|
||||
let t = |
||||
let rep = |
||||
Caqti_type.(tup2 |
||||
(tup4 |
||||
uuid |
||||
(tup2 |
||||
Rep.ptime |
||||
Rep.ptime) |
||||
(tup2 |
||||
execution_result |
||||
console) |
||||
string) |
||||
id) |
||||
in |
||||
let encode { uuid; start; finish; result; console; script; job_id } = |
||||
Ok ((uuid, (start, finish), (result, console), script), job_id) |
||||
in |
||||
let decode ((uuid, (start, finish), (result, console), script), job_id) = |
||||
Ok { uuid; start; finish; result; console; script; job_id } |
||||
in |
||||
Caqti_type.custom ~encode ~decode rep |
||||
|
||||
module Meta = struct |
||||
type t = { |
||||
uuid : Uuidm.t; |
||||
start : Ptime.t; |
||||
finish : Ptime.t; |
||||
result : Builder.execution_result; |
||||
job_id : id; |
||||
} |
||||
|
||||
let t = |
||||
let rep = |
||||
Caqti_type.(tup2 |
||||
(tup4 |
||||
uuid |
||||
Rep.ptime |
||||
Rep.ptime |
||||
execution_result) |
||||
id) |
||||
in |
||||
let encode { uuid; start; finish; result; job_id } = |
||||
Ok ((uuid, start, finish, result), job_id) |
||||
in |
||||
let decode ((uuid, start, finish, result), job_id) = |
||||
Ok { uuid; start; finish; result; job_id } |
||||
in |
||||
Caqti_type.custom ~encode ~decode rep |
||||
end |
||||
|
||||
let migrate = |
||||
Caqti_request.exec |
||||
Caqti_type.unit |
||||
{| CREATE TABLE build ( |
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, |
||||
uuid VARCHAR(36) NOT NULL UNIQUE, |
||||
start_d INTEGER NOT NULL, |
||||
start_ps INTEGER NOT NULL, |
||||
finish_d INTEGER NOT NULL, |
||||
finish_ps INTEGER NOT NULL, |
||||
result_kind TINYINT NOT NULL, |
||||
result_code INTEGER, |
||||
result_msg TEXT, |
||||
console BLOB NOT NULL, |
||||
script TEXT NOT NULL, |
||||
job INTEGER NOT NULL, |
||||
|
||||
FOREIGN KEY(job) REFERENCES job(id) |
||||
) |
||||
|} |
||||
|
||||
let rollback = |
||||
Caqti_request.exec |
||||
Caqti_type.unit |
||||
{| DROP TABLE IF EXISTS build |} |
||||
|
||||
let get_opt = |
||||
Caqti_request.find_opt |
||||
Caqti_type.int64 |
||||
t |
||||
{| SELECT uuid, start_d, start_ps, finish_d, finish_ps, |
||||
result_kind, result_code, result_msg, |
||||
console, script, job |
||||
FROM build |
||||
WHERE id = ? |
||||
|} |
||||
|
||||
let get_by_uuid = |
||||
Caqti_request.find_opt |
||||
Rep.uuid |
||||
(Caqti_type.tup2 id t) |
||||
{| SELECT id, uuid, start_d, start_ps, finish_d, finish_ps, |
||||
result_kind, result_code, result_msg, |
||||
console, script, job |
||||
FROM build |
||||
WHERE uuid = ? |
||||
|} |
||||
|
||||
let get_all = |
||||
Caqti_request.collect |
||||
Caqti_type.int64 |
||||
(Caqti_type.tup2 id t) |
||||
{| SELECT id, uuid, start_d, start_ps, finish_d, finish_ps, |
||||
result_kind, result_code, result_msg, console, |
||||
script, job |
||||
FROM build |
||||
WHERE job = ? |
||||
ORDER BY start_d ASC, start_ps ASC |
||||
|} |
||||
|
||||
let get_all_meta = |
||||
Caqti_request.collect |
||||
Caqti_type.int64 |
||||
(Caqti_type.tup2 |
||||
id Meta.t) |
||||
{| SELECT id, uuid, start_d, start_ps, finish_d, finish_ps, |
||||
result_kind, result_code, result_msg, job |
||||
FROM build |
||||
WHERE job = ? |
||||
ORDER BY start_d ASC, start_ps ASC |
||||
|} |
||||
|
||||
let get_all_meta_by_name = |
||||
Caqti_request.collect |
||||
Caqti_type.string |
||||
(Caqti_type.tup2 |
||||
id Meta.t) |
||||
{| SELECT build.id, build.uuid, |
||||
build.start_d, build.start_ps, build.finish_d, build.finish_ps, |
||||
build.result_kind, build.result_code, build.result_msg, build.job |
||||
FROM build, job |
||||
WHERE job.name = ? AND build.job = job.id |
||||
ORDER BY start_d ASC, start_ps ASC |
||||
|} |
||||
|
||||
|
||||
let add = |
||||
Caqti_request.exec |
||||
t |
||||
{| INSERT INTO build |
||||
(uuid, start_d, start_ps, finish_d, finish_ps, |
||||
result_kind, result_code, result_msg, console, script, job) |
||||
VALUES |
||||
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
||||
|} |
||||
|
||||
end |
||||
|
||||
let migrate = [ |
||||
Job.migrate; |
||||
Build.migrate; |
||||
Build_artifact.migrate; |
||||
Build_file.migrate; |
||||
] |
||||
|
||||
let rollback = [ |
||||
Build_file.migrate; |
||||
Build_artifact.rollback; |
||||
Build.rollback; |
||||
Job.rollback; |
||||
] |
@ -0,0 +1,114 @@ |
||||
type id |
||||
|
||||
type file = { |
||||
filepath : Fpath.t; |
||||
localpath : Fpath.t; |
||||
sha256 : Cstruct.t; |
||||
} |
||||
val file : file Caqti_type.t |
||||
|
||||
val last_insert_rowid : |
||||
(unit, id, [< `Many | `One | `Zero > `One ]) Caqti_request.t |
||||
|
||||
module Job : sig |
||||
val migrate : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
val rollback : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
|
||||
val get : |
||||
(id, string, [< `Many | `One | `Zero > `One `Zero ]) |
||||
Caqti_request.t |
||||
val get_id_by_name : |
||||
(string, id, [< `Many | `One | `Zero > `One ]) Caqti_request.t |
||||
val get_all : |
||||
(unit, id * string, [ `Many | `One | `Zero ]) Caqti_request.t |
||||
val try_add : |
||||
(string, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
val remove : |
||||
(id, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
end |
||||
|
||||
module Build_artifact : sig |
||||
val migrate : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
val rollback : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
|
||||
val get_by_build_uuid : |
||||
(Uuidm.t * Fpath.t, Fpath.t * Cstruct.t, |
||||
[< `Many | `One | `Zero > `One `Zero ]) |
||||
Caqti_request.t |
||||
val get_all_by_build : |
||||
(id, id * file, [ `Many | `One | `Zero ]) Caqti_request.t |
||||
val add : |
||||
(file * id, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
val remove_by_build : |
||||
(id, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
end |
||||
|
||||
module Build_file : sig |
||||
val migrate : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
val rollback : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
|
||||
val get_by_build_uuid : |
||||
(Uuidm.t * Fpath.t, Fpath.t * Cstruct.t, |
||||
[< `Many | `One | `Zero > `One `Zero ]) |
||||
Caqti_request.t |
||||
val get_all_by_build : |
||||
(id, id * file, [ `Many | `One | `Zero ]) Caqti_request.t |
||||
val add : |
||||
(file * id, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
val remove_by_build : |
||||
(id, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
end |
||||
|
||||
module Build : |
||||
sig |
||||
type t = { |
||||
uuid : Uuidm.t; |
||||
start : Ptime.t; |
||||
finish : Ptime.t; |
||||
result : Builder.execution_result; |
||||
console : (int * string) list; |
||||
script : string; |
||||
job_id : id; |
||||
} |
||||
val t : t Caqti_type.t |
||||
module Meta : |
||||
sig |
||||
type t = { |
||||
uuid : Uuidm.t; |
||||
start : Ptime.t; |
||||
finish : Ptime.t; |
||||
result : Builder.execution_result; |
||||
job_id : id; |
||||
} |
||||
val t : t Caqti_type.t |
||||
end |
||||
|
||||
val migrate : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
val rollback : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
|
||||
val get_opt : |
||||
(id, t, [< `Many | `One | `Zero > `One `Zero ]) Caqti_request.t |
||||
val get_by_uuid : |
||||
(Uuidm.t, id * t, [< `Many | `One | `Zero > `One `Zero ]) |
||||
Caqti_request.t |
||||
val get_all : |
||||
(id, id * t, [ `Many | `One | `Zero ]) Caqti_request.t |
||||
val get_all_meta : |
||||
(id, id * Meta.t, [ `Many | `One | `Zero ]) Caqti_request.t |
||||
val get_all_meta_by_name : |
||||
(string, id * Meta.t, [ `Many | `One | `Zero ]) Caqti_request.t |
||||
val add : (t, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t |
||||
end |
||||
|
||||
val migrate : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t list |
||||
val rollback : |
||||
(unit, unit, [< `Many | `One | `Zero > `Zero ]) Caqti_request.t list |
@ -0,0 +1,3 @@ |
||||
(library |
||||
(name builder_db) |
||||
(libraries builder caqti caqti-driver-sqlite3 asn1-combinators mirage-crypto)) |
@ -0,0 +1,74 @@ |
||||
module Asn = struct |
||||
let decode_strict codec cs = |
||||
match Asn.decode codec cs with |
||||
| Ok (a, cs) -> |
||||
if Cstruct.len cs = 0 |
||||
then Ok a |
||||
else Error "trailing bytes" |
||||
| Error (`Parse msg) -> Error ("parse error: " ^ msg) |
||||
|
||||
let projections_of asn = |
||||
let c = Asn.codec Asn.der asn in |
||||
(decode_strict c, Asn.encode c) |
||||
|
||||
let console = |
||||
Asn.S.(sequence_of |
||||
(sequence2 |
||||
(required ~label:"delta" int) |
||||
(required ~label:"data" utf8_string))) |
||||
|
||||
let console_of_cs, console_to_cs = projections_of console |
||||
end |
||||
|
||||
type id = int64 |
||||
let id = Caqti_type.int64 |
||||
|
||||
let uuid = |
||||
let encode uuid = Ok (Uuidm.to_bytes uuid) in |
||||
let decode s = |
||||
Uuidm.of_bytes s |
||||
|> Option.to_result ~none:"failed to decode uuid" |
||||
in |
||||
Caqti_type.custom ~encode ~decode Caqti_type.string |
||||
|
||||
|
||||
let ptime = |
||||
let encode t = Ok (Ptime.Span.to_d_ps (Ptime.to_span t)) in |
||||
let decode (d, ps) = Ok (Ptime.v (d, ps)) |
||||
in |
||||
let rep = Caqti_type.(tup2 int int64) in |
||||
Caqti_type.custom ~encode ~decode rep |
||||
|
||||
let fpath = |
||||
let encode t = Ok (Fpath.to_string t) in |
||||
let decode s = Fpath.of_string s |
||||
|> Result.map_error (fun (`Msg s) -> s) in |
||||
Caqti_type.custom ~encode ~decode Caqti_type.string |
||||
|
||||
let cstruct = |
||||
let encode t = Ok (Cstruct.to_string t) in |
||||
let decode s = Ok (Cstruct.of_string s) in |
||||
Caqti_type.custom ~encode ~decode Caqti_type.octets |
||||
|
||||
let execution_result = |
||||
let encode = function |
||||
| Builder.Exited v -> Ok (0, Some v, None) |
||||
| Builder.Signalled v -> Ok (1, Some v, None) |
||||
| Builder.Stopped v -> Ok (2, Some v, None) |
||||
| Builder.Msg msg -> Ok (3, None, Some msg) |
||||
in |
||||
let decode (kind, code, msg) = |
||||
match kind, code, msg with |
||||
| 0, Some v, None -> Ok (Builder.Exited v) |
||||
| 1, Some v, None -> Ok (Builder.Signalled v) |
||||
| 2, Some v, None -> Ok (Builder.Stopped v) |
||||
| 3, None, Some msg -> Ok (Builder.Msg msg) |
||||
| _ -> Error "bad encoding" |
||||
in |
||||
let rep = Caqti_type.(tup3 int (option int) (option string)) in |
||||
Caqti_type.custom ~encode ~decode rep |
||||
|
||||
let console = |
||||
let encode console = Ok (Asn.console_to_cs console) in |
||||
let decode data = Asn.console_of_cs data in |
||||
Caqti_type.custom ~encode ~decode cstruct |
@ -1,3 +1,3 @@ |
||||
(library |
||||
(name builder_web) |
||||
(libraries builder opium tyxml bos rresult duration hex)) |
||||
(libraries builder builder_db opium tyxml bos rresult duration hex caqti-lwt)) |
||||
|
@ -1,119 +1,52 @@ |
||||
let src = Logs.Src.create "builder-web.model" ~doc:"Builder_web model" |
||||
module Log = (val Logs.src_log src : Logs.LOG) |
||||
|
||||
open Lwt.Syntax |
||||
open Lwt_result.Infix |
||||
|
||||
module RunMap = Map.Make(struct |
||||
type t = Fpath.t * Fpath.t |
||||
let compare (j1,r1) (j2,r2) = |
||||
Fpath.(compare (j1 // r1) (j2 // r2)) |
||||
end) |
||||
|
||||
type job_run_meta = { |
||||
job_info : Builder.job; |
||||
uuid : Uuidm.t; |
||||
start : Ptime.t; |
||||
finish : Ptime.t; |
||||
result : Builder.execution_result; |
||||
} |
||||
|
||||
type job_run_info = { |
||||
meta : job_run_meta; |
||||
out : (int * string) list; |
||||
data : (Fpath.t * string) list |
||||
} |
||||
|
||||
type digest = { |
||||
sha256 : Cstruct.t; |
||||
} |
||||
|
||||
type t = { |
||||
dir : Fpath.t; |
||||
mutable meta_cache : job_run_meta RunMap.t; |
||||
mutable digest_cache : (Fpath.t * digest) list RunMap.t |
||||
} |
||||
|
||||
let init dir = { dir; meta_cache = RunMap.empty; digest_cache = RunMap.empty; } |
||||
|
||||
type job = { |
||||
path : Fpath.t; |
||||
runs : job_run_meta list; |
||||
} |
||||
|
||||
let job_name { path; _ } = Fpath.to_string path |
||||
|
||||
let read_full t path run = |
||||
let f = Fpath.(t.dir // path // run / "full") in |
||||
let* ic = Lwt_io.open_file ~mode:Lwt_io.Input (Fpath.to_string f) in |
||||
let+ s = Lwt_io.read ic in |
||||
let open Rresult.R.Infix in |
||||
Builder.Asn.exec_of_cs (Cstruct.of_string s) |
||||
>>| fun (job_info, uuid, out, start, finish, result, data) -> |
||||
let meta = { job_info; uuid; start; finish; result } in |
||||
t.meta_cache <- RunMap.add (path, run) meta t.meta_cache; |
||||
{ meta; out; data } |
||||
|
||||
let digest (path, data) = |
||||
let module H = Mirage_crypto.Hash in |
||||
let data = Cstruct.of_string data in |
||||
(path, { |
||||
sha256 = H.SHA256.digest data; |
||||
}) |
||||
let src = Logs.Src.create "builder-web.model" ~doc:"Builder_web model" |
||||
module Log = (val Logs.src_log src : Logs.LOG) |
||||
|
||||
let read_full_with_digests t path run = |
||||
read_full t path run >|= fun ({ data; _ } as full) -> |
||||
match RunMap.find_opt (path, run) t.digest_cache with |
||||
| Some digests -> full, digests |
||||
module type CONN = Caqti_lwt.CONNECTION |
||||
|
||||
type error = [ Caqti_error.call_or_retrieve | `Not_found | `File_error of Fpath.t ] |
||||
|
||||
let pp_error ppf = function |
||||
| `Not_found -> Format.fprintf ppf "value not found in database" |
||||
| `File_error path -> Format.fprintf ppf "error reading file %a" Fpath.pp path |
||||
| #Caqti_error.call_or_retrieve as e -> |
||||
Caqti_error.pp ppf e |
||||
|
||||
let not_found = function |
||||
| None -> Lwt.return (Error `Not_found :> (_, [> error ]) result) |
||||
| Some v -> Lwt_result.return v |
||||
|
||||
let read_file filepath = |
||||
Lwt.try_bind |
||||
(fun () -> Lwt_io.open_file ~mode:Lwt_io.Input (Fpath.to_string filepath)) |
||||
(fun ic -> Lwt_result.ok (Lwt_io.read ic)) |
||||
(function |
||||
| Unix.Unix_error (e, _, _) -> |
||||
Logs.warn (fun m -> m "Error reading local file %a: %s" |
||||
Fpath.pp filepath (Unix.error_message e)); |
||||
Lwt.return_error (`File_error filepath) |
||||
| e -> Lwt.fail e) |
||||
|
||||
let build_artifact build filepath (module Db : CONN) = |
||||
Db.find_opt Builder_db.Build_artifact.get_by_build_uuid (build, filepath) |
||||
>>= function |
||||
| Some (localpath, sha256) -> |
||||
read_file localpath >|= fun data -> data, sha256 |
||||
| None -> |
||||
let digests = List.map digest data in |
||||
t.digest_cache <- RunMap.add (path, run) digests t.digest_cache; |
||||
full, digests |
||||
Lwt.return_error `Not_found |
||||
|
||||
let read_full_meta t path run = |
||||
match RunMap.find_opt (path, run) t.meta_cache with |
||||
| Some meta -> |
||||
Lwt_result.lift (Bos.OS.File.exists Fpath.(t.dir // path // run / "full")) >>= fun exists -> |
||||
if exists |
||||
then Lwt_result.return meta |
||||
else |
||||
(t.meta_cache <- RunMap.remove (path, run) t.meta_cache; |
||||
Lwt_result.fail (`Msg "no such file")) |
||||
| None -> |
||||
read_full t path run >|= fun { meta; out = _; data = _ } -> |
||||
meta |
||||
let build_artifacts build (module Db : CONN) = |
||||
Db.collect_list Builder_db.Build_artifact.get_all_by_build build >|= |
||||
List.map snd |
||||
|
||||
let build uuid (module Db : CONN) = |
||||
Db.find_opt Builder_db.Build.get_by_uuid uuid >>= |
||||
not_found |
||||
|
||||
let job t job = |
||||
let path = Fpath.(t.dir // job) in |
||||
let open Lwt_result.Infix in |
||||
Lwt_result.lift (Bos.OS.Dir.contents ~rel:true path) >>= fun runs -> |
||||
let+ runs = |
||||
Lwt_list.filter_map_s (fun run -> |
||||
let+ meta = read_full_meta t job run in |
||||
match meta with |
||||
| Error (`Msg e) -> |
||||
Log.warn (fun m -> m "error reading job run file %a: %s" |
||||
Fpath.pp Fpath.(path // run) e); |
||||
None |
||||
| Ok meta -> Some meta) |
||||
runs |
||||
in |
||||
Ok { path = job; runs } |
||||
let job job (module Db : CONN) = |
||||
Db.collect_list Builder_db.Build.get_all_meta_by_name job |
||||
|
||||
let jobs t = |
||||
let r = |
||||
let open Rresult.R.Infix in |
||||
Bos.OS.Dir.contents ~rel:true t.dir >>| |
||||
List.filter (fun f -> not (Fpath.equal (Fpath.v "state") f)) >>| |
||||
List.filter_map (fun f -> |
||||
match Bos.OS.Dir.exists Fpath.(t.dir // f) with |
||||
| Ok true -> Some f |
||||
| Ok false -> |
||||
Log.warn (fun m -> m "dir %a doesn't exist" Fpath.pp |
||||
Fpath.(t.dir // f)); |
||||
None |
||||
| Error (`Msg e) -> |
||||
Log.warn (fun m -> m "error reading job dir %a: %s" Fpath.pp |
||||
Fpath.(t.dir // f) e); |
||||
None) |
||||
in Lwt.return r |
||||
let jobs (module Db : CONN) = |
||||
Db.collect_list Builder_db.Job.get_all () >|= |
||||
List.map snd |
||||
|
@ -1,36 +1,18 @@ |
||||
type t |
||||
type error = [ Caqti_error.call_or_retrieve | `Not_found | `File_error of Fpath.t ] |
||||
|
||||
type job_run_meta = { |
||||
job_info : Builder.job; |
||||
uuid : Uuidm.t; |
||||
start : Ptime.t; |
||||
finish : Ptime.t; |
||||
result : Builder.execution_result; |
||||
} |
||||
val pp_error : Format.formatter -> error -> unit |
||||
|
||||
type digest = { |
||||
sha256 : Cstruct.t; |
||||
} |
||||
val build_artifact : Uuidm.t -> Fpath.t -> Caqti_lwt.connection -> |
||||
(string * Cstruct.t, [> error ]) result Lwt.t |
||||
|
||||
type job_run_info = { |
||||
meta : job_run_meta; |
||||
out : (int * string) list; |
||||
data : (Fpath.t * string) list |
||||
} |
||||
val build_artifacts : Builder_db.id -> Caqti_lwt.connection -> |
||||
(Builder_db.file list, [> error ]) result Lwt.t |
||||
|
||||
type job = { |
||||
path : Fpath.t; |
||||
runs : job_run_meta list; |
||||
} |
||||
val build : Uuidm.t -> Caqti_lwt.connection -> |
||||
(Builder_db.id * Builder_db.Build.t, [> error ]) result Lwt.t |
||||
|
||||
val init : Fpath.t -> t |
||||
val job : string -> Caqti_lwt.connection -> |
||||
((Builder_db.id * Builder_db.Build.Meta.t) list, [> error ]) result Lwt.t |
||||
|
||||
val job_name : job -> string |
||||
|
||||
val read_full : t -> Fpath.t -> Fpath.t -> (job_run_info, [> `Msg of string ]) result Lwt.t |
||||
|
||||
val read_full_with_digests : t -> Fpath.t -> Fpath.t -> |
||||
(job_run_info * (Fpath.t * digest) list, [> `Msg of string ]) result Lwt.t |
||||
|
||||
val job : t -> Fpath.t -> (job, [> `Msg of string]) result Lwt.t |
||||
val jobs : t -> (Fpath.t list, [> `Msg of string ]) result Lwt.t |
||||
val jobs : Caqti_lwt.connection -> |
||||
(string list, [> error ]) result Lwt.t |
||||
|
Loading…
Reference in new issue