From b08006dbb212ecb2ebee9f0d0310a2efde4fd27e Mon Sep 17 00:00:00 2001 From: Vincent Balat Date: Fri, 22 Aug 2025 16:39:32 +0200 Subject: [PATCH 01/21] Add Eliom_stream, copy of Lwt_stream --- src/lib/eliom_stream.shared.ml | 983 ++++++++++++++++++++++++++++++++ src/lib/eliom_stream.shared.mli | 400 +++++++++++++ 2 files changed, 1383 insertions(+) create mode 100644 src/lib/eliom_stream.shared.ml create mode 100644 src/lib/eliom_stream.shared.mli diff --git a/src/lib/eliom_stream.shared.ml b/src/lib/eliom_stream.shared.ml new file mode 100644 index 0000000000..780fb67bff --- /dev/null +++ b/src/lib/eliom_stream.shared.ml @@ -0,0 +1,983 @@ +(* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *) + +open Lwt.Infix + +exception Closed +exception Full +exception Empty + +(* A node in a queue of pending data. *) +type 'a node = + { mutable next : 'a node + ; (* Next node in the queue. For the last node it points to itself. *) + mutable data : 'a option + (* Data of this node. For the last node it is always [None]. *) } + +(* Note: a queue for an exhausted stream is represented by a node + containing [None] followed by a node with itself as next and [None] + as data. *) + +let new_node () = + let rec node = {next = node; data = None} in + node + +(* Type of a stream source using a function to create new elements. *) +type 'a from = + { from_create : unit -> 'a option Lwt.t + ; (* Function used to create new elements. *) + mutable from_thread : unit Lwt.t + (* Thread which: + + - wait for the thread returned by the last call to [from_next], + - add the next element to the end of the queue. + + If it is a sleeping thread, then it must be used instead of creating a + new one with [from_create]. *) + } + +(* Type of a stream source for push streams. *) +[@@@ocaml.warning "-69"] + +type push = + { mutable push_signal : unit Lwt.t + ; (* Thread signaled when a new element is added to the stream. *) + mutable push_waiting : bool + ; (* Is a thread waiting on [push_signal] ? *) + mutable push_external : Obj.t [@ocaml.warning "-69"] + (* Reference to an external source. *) } + +(* Type of a stream source for bounded-push streams. *) +type 'a push_bounded = + { mutable pushb_signal : unit Lwt.t + ; (* Thread signaled when a new element is added to the stream. *) + mutable pushb_waiting : bool + ; (* Is a thread waiting on [pushb_signal] ? *) + mutable pushb_size : int + ; (* Size of the queue. *) + mutable pushb_count : int + ; (* Current length of the queue. *) + mutable pushb_pending : 'a option + ; (* The next element to push if a thread blocked on push. We store it + here to be sure it will be the first element to be added when + space becomes available. *) + mutable pushb_push_waiter : unit Lwt.t + ; mutable pushb_push_wakener : unit Lwt.u + ; (* Thread blocked on push. *) + mutable pushb_external : Obj.t [@ocaml.warning "-69"] + (* Reference to an external source. *) } + +[@@@ocaml.warning "+69"] + +(* Source of a stream. *) +type 'a source = + | From of 'a from + | From_direct of (unit -> 'a option) + | Push of push + | Push_bounded of 'a push_bounded + +type 'a t = + { source : 'a source + ; (* The source of the stream. *) + close : unit Lwt.u + ; (* A wakener for a thread that sleeps until the stream is closed. *) + closed : unit Lwt.t + ; (* A waiter for a thread that sleeps until the stream is closed. *) + mutable node : 'a node + ; (* Pointer to first pending element, or to [last] if there is no + pending element. *) + last : 'a node ref + (* Node marking the end of the queue of pending elements. *) } + +class type ['a] bounded_push = object + method size : int + method resize : int -> unit + method push : 'a -> unit Lwt.t + method close : unit + method count : int + method blocked : bool + method closed : bool + method set_reference : 'a. 'a -> unit +end + +(* The only difference between two clones is the pointer to the first + pending element. *) +let clone s = + (match s.source with + | Push_bounded _ -> invalid_arg "Lwt_stream.clone" + | From _ | From_direct _ | Push _ -> ()); + { source = s.source + ; close = s.close + ; closed = s.closed + ; node = s.node + ; last = s.last } + +let from_source source = + let node = new_node () in + let closed, close = Lwt.wait () in + {source; close; closed; node; last = ref node} + +let from f = from_source (From {from_create = f; from_thread = Lwt.return_unit}) +let from_direct f = from_source (From_direct f) +let closed s = s.closed +let is_closed s = not (Lwt.is_sleeping (closed s)) + +let enqueue' e last = + let node = !last and new_last = new_node () in + node.data <- e; + node.next <- new_last; + last := new_last + +let enqueue e s = enqueue' e s.last + +let create_with_reference () = + (* Create the source for notifications of new elements. *) + let source, push_signal_resolver = + let push_signal, push_signal_resolver = Lwt.wait () in + ( {push_signal; push_waiting = false; push_external = Obj.repr ()} + , ref push_signal_resolver ) + in + let t = from_source (Push source) in + (* [push] should not close over [t] so that it can be garbage collected even + * there are still references to [push]. Unpack all the components of [t] + * that [push] needs and reference those identifiers instead. *) + let close = t.close and closed = t.closed and last = t.last in + (* The push function. It does not keep a reference to the stream. *) + let push x = + if not (Lwt.is_sleeping closed) then raise Closed; + (* Push the element at the end of the queue. *) + enqueue' x last; + (* Send a signal if at least one thread is waiting for a new + element. *) + if source.push_waiting + then ( + source.push_waiting <- false; + (* Update threads. *) + let old_push_signal_resolver = !push_signal_resolver in + let new_waiter, new_push_signal_resolver = Lwt.wait () in + source.push_signal <- new_waiter; + push_signal_resolver := new_push_signal_resolver; + (* Signal that a new value has been received. *) + Lwt.wakeup_later old_push_signal_resolver ()); + (* Do this at the end in case one of the function raise an + exception. *) + if x = None then Lwt.wakeup close () + in + t, push, fun x -> source.push_external <- Obj.repr x + +let return a = + let stream, push, _ = create_with_reference () in + push (Some a); push None; stream + +let return_lwt a = + let source, push, _ = create_with_reference () in + Lwt.dont_wait + (fun () -> Lwt.bind a (fun x -> push (Some x); push None; Lwt.return_unit)) + (fun _exc -> push None); + source + +let of_seq s = + let s = ref s in + let get () = + match !s () with + | Seq.Nil -> None + | Seq.Cons (elt, s') -> + s := s'; + Some elt + in + from_direct get + +let of_lwt_seq s = + let s = ref s in + let get () = + !s () >|= function + | Lwt_seq.Nil -> None + | Lwt_seq.Cons (elt, s') -> + s := s'; + Some elt + in + from get + +let create () = + let source, push, _ = create_with_reference () in + source, push + +let of_iter iter i = + let stream, push = create () in + iter (fun x -> push (Some x)) i; + push None; + stream + +let of_list l = of_iter List.iter l +let of_array a = of_iter Array.iter a +let of_string s = of_iter String.iter s + +(* Add the pending element to the queue and notify the blocked pushed. + + Precondition: info.pushb_pending = Some _ + + This does not modify info.pushb_count. *) +let notify_pusher info last = + (* Push the element at the end of the queue. *) + enqueue' info.pushb_pending last; + (* Clear pending element. *) + info.pushb_pending <- None; + (* Wakeup the pusher. *) + let old_wakener = info.pushb_push_wakener in + let waiter, wakener = Lwt.task () in + info.pushb_push_waiter <- waiter; + info.pushb_push_wakener <- wakener; + Lwt.wakeup_later old_wakener () + +class ['a] bounded_push_impl (info : 'a push_bounded) wakener_cell last close = + object + val mutable closed = false + method size = info.pushb_size + + method resize size = + if size < 0 then invalid_arg "Lwt_stream.bounded_push#resize"; + info.pushb_size <- size; + if info.pushb_count < info.pushb_size && info.pushb_pending <> None + then ( + info.pushb_count <- info.pushb_count + 1; + notify_pusher info last) + + method push x = + if closed + then Lwt.fail Closed + else if info.pushb_pending <> None + then Lwt.fail Full + else if info.pushb_count >= info.pushb_size + then ( + info.pushb_pending <- Some x; + Lwt.catch + (fun () -> info.pushb_push_waiter) + (fun exn -> + match exn with + | Lwt.Canceled -> + info.pushb_pending <- None; + let waiter, wakener = Lwt.task () in + info.pushb_push_waiter <- waiter; + info.pushb_push_wakener <- wakener; + Lwt.reraise exn + | _ -> Lwt.reraise exn)) + else ( + (* Push the element at the end of the queue. *) + enqueue' (Some x) last; + info.pushb_count <- info.pushb_count + 1; + (* Send a signal if at least one thread is waiting for a new + element. *) + if info.pushb_waiting + then ( + info.pushb_waiting <- false; + (* Update threads. *) + let old_wakener = !wakener_cell in + let new_waiter, new_wakener = Lwt.wait () in + info.pushb_signal <- new_waiter; + wakener_cell := new_wakener; + (* Signal that a new value has been received. *) + Lwt.wakeup_later old_wakener ()); + Lwt.return_unit) + + method close = + if not closed + then ( + closed <- true; + let node = !last and new_last = new_node () in + node.data <- None; + node.next <- new_last; + last := new_last; + if info.pushb_pending <> None + then ( + info.pushb_pending <- None; + Lwt.wakeup_later_exn info.pushb_push_wakener Closed); + (* Send a signal if at least one thread is waiting for a new + element. *) + if info.pushb_waiting + then ( + info.pushb_waiting <- false; + let old_wakener = !wakener_cell in + (* Signal that a new value has been received. *) + Lwt.wakeup_later old_wakener ()); + Lwt.wakeup close ()) + + method count = info.pushb_count + method blocked = info.pushb_pending <> None + method closed = closed + + method set_reference : 'a. 'a -> unit = + fun x -> info.pushb_external <- Obj.repr x + end + +let create_bounded size = + if size < 0 then invalid_arg "Lwt_stream.create_bounded"; + (* Create the source for notifications of new elements. *) + let info, wakener_cell = + let waiter, wakener = Lwt.wait () in + let push_waiter, push_wakener = Lwt.task () in + ( { pushb_signal = waiter + ; pushb_waiting = false + ; pushb_size = size + ; pushb_count = 0 + ; pushb_pending = None + ; pushb_push_waiter = push_waiter + ; pushb_push_wakener = push_wakener + ; pushb_external = Obj.repr () } + , ref wakener ) + in + let t = from_source (Push_bounded info) in + t, new bounded_push_impl info wakener_cell t.last t.close + +(* Wait for a new element to be added to the queue of pending element + of the stream. *) +let feed s = + match s.source with + | From from -> + (* There is already a thread started to create a new element, + wait for this one to terminate. *) + if Lwt.is_sleeping from.from_thread + then Lwt.protected from.from_thread + else + (* Otherwise request a new element. *) + let thread = + (* The function [from_create] can raise an exception (with + [raise], rather than returning a failed promise with + [Lwt.fail]). In this case, we have to catch the exception + and turn it into a safe failed promise. *) + Lwt.catch + (fun () -> + from.from_create () >>= fun x -> + (* Push the element to the end of the queue. *) + enqueue x s; + if x = None then Lwt.wakeup s.close (); + Lwt.return_unit) + Lwt.reraise + in + (* Allow other threads to access this thread. *) + from.from_thread <- thread; + Lwt.protected thread + | From_direct f -> + let x = f () in + (* Push the element to the end of the queue. *) + enqueue x s; + if x = None then Lwt.wakeup s.close (); + Lwt.return_unit + | Push push -> + push.push_waiting <- true; + Lwt.protected push.push_signal + | Push_bounded push -> + push.pushb_waiting <- true; + Lwt.protected push.pushb_signal + +(* Remove [node] from the top of the queue, or do nothing if it was + already consumed. + + Precondition: node.data <> None +*) +let consume s node = + if node == s.node + then ( + s.node <- node.next; + match s.source with + | Push_bounded info -> + if info.pushb_pending = None + then info.pushb_count <- info.pushb_count - 1 + else notify_pusher info s.last + | From _ | From_direct _ | Push _ -> ()) + +let rec peek_rec s node = + if node == !(s.last) + then feed s >>= fun () -> peek_rec s node + else Lwt.return node.data + +let peek s = peek_rec s s.node + +let rec npeek_rec node acc n s = + if n <= 0 + then Lwt.return (List.rev acc) + else if node == !(s.last) + then feed s >>= fun () -> npeek_rec node acc n s + else + match node.data with + | Some x -> npeek_rec node.next (x :: acc) (n - 1) s + | None -> Lwt.return (List.rev acc) + +let npeek n s = npeek_rec s.node [] n s + +let rec get_rec s node = + if node == !(s.last) + then feed s >>= fun () -> get_rec s node + else ( + if node.data <> None then consume s node; + Lwt.return node.data) + +let get s = get_rec s s.node + +let rec get_exn_rec s node = + if node == !(s.last) + then + Lwt.try_bind + (fun () -> feed s) + (fun () -> get_exn_rec s node) + (fun exn -> Lwt.return (Some (Result.Error exn))) + else + match node.data with + | Some value -> + consume s node; + Lwt.return (Some (Result.Ok value)) + | None -> Lwt.return_none + +let wrap_exn s = from (fun () -> get_exn_rec s s.node) + +let rec nget_rec node acc n s = + if n <= 0 + then Lwt.return (List.rev acc) + else if node == !(s.last) + then feed s >>= fun () -> nget_rec node acc n s + else + match s.node.data with + | Some x -> + consume s node; + nget_rec node.next (x :: acc) (n - 1) s + | None -> Lwt.return (List.rev acc) + +let nget n s = nget_rec s.node [] n s + +let rec get_while_rec node acc f s = + if node == !(s.last) + then feed s >>= fun () -> get_while_rec node acc f s + else + match node.data with + | Some x -> + let test = f x in + if test + then ( + consume s node; + get_while_rec node.next (x :: acc) f s) + else Lwt.return (List.rev acc) + | None -> Lwt.return (List.rev acc) + +let get_while f s = get_while_rec s.node [] f s + +let rec get_while_s_rec node acc f s = + if node == !(s.last) + then feed s >>= fun () -> get_while_s_rec node acc f s + else + match node.data with + | Some x -> ( + f x >>= function + | true -> + consume s node; + get_while_s_rec node.next (x :: acc) f s + | false -> Lwt.return (List.rev acc)) + | None -> Lwt.return (List.rev acc) + +let get_while_s f s = get_while_s_rec s.node [] f s + +let rec next_rec s node = + if node == !(s.last) + then feed s >>= fun () -> next_rec s node + else + match node.data with + | Some x -> consume s node; Lwt.return x + | None -> Lwt.fail Empty + +let next s = next_rec s s.node + +let rec last_new_rec node x s = + if node == !(s.last) + then + let thread = feed s in + match Lwt.state thread with + | Lwt.Return _ -> last_new_rec node x s + | Lwt.Fail exn -> Lwt.fail exn + | Lwt.Sleep -> Lwt.return x + else + match node.data with + | Some x -> consume s node; last_new_rec node.next x s + | None -> Lwt.return x + +let last_new s = + let node = s.node in + if node == !(s.last) + then + let thread = next s in + match Lwt.state thread with + | Lwt.Return x -> last_new_rec node x s + | Lwt.Fail _ | Lwt.Sleep -> thread + else + match node.data with + | Some x -> consume s node; last_new_rec node.next x s + | None -> Lwt.fail Empty + +let rec to_list_rec node acc s = + if node == !(s.last) + then feed s >>= fun () -> to_list_rec node acc s + else + match node.data with + | Some x -> + consume s node; + to_list_rec node.next (x :: acc) s + | None -> Lwt.return (List.rev acc) + +let to_list s = to_list_rec s.node [] s + +let rec to_string_rec node buf s = + if node == !(s.last) + then feed s >>= fun () -> to_string_rec node buf s + else + match node.data with + | Some x -> + consume s node; + Buffer.add_char buf x; + to_string_rec node.next buf s + | None -> Lwt.return (Buffer.contents buf) + +let to_string s = to_string_rec s.node (Buffer.create 128) s + +let junk s = + let node = s.node in + if node == !(s.last) + then ( + feed s >>= fun () -> + if node.data <> None then consume s node; + Lwt.return_unit) + else ( + if node.data <> None then consume s node; + Lwt.return_unit) + +let rec njunk_rec node n s = + if n <= 0 + then Lwt.return_unit + else if node == !(s.last) + then feed s >>= fun () -> njunk_rec node n s + else + match node.data with + | Some _ -> + consume s node; + njunk_rec node.next (n - 1) s + | None -> Lwt.return_unit + +let njunk n s = njunk_rec s.node n s + +let rec junk_while_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> junk_while_rec node f s + else + match node.data with + | Some x -> + let test = f x in + if test + then ( + consume s node; + junk_while_rec node.next f s) + else Lwt.return_unit + | None -> Lwt.return_unit + +let junk_while f s = junk_while_rec s.node f s + +let rec junk_while_s_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> junk_while_s_rec node f s + else + match node.data with + | Some x -> ( + f x >>= function + | true -> + consume s node; + junk_while_s_rec node.next f s + | false -> Lwt.return_unit) + | None -> Lwt.return_unit + +let junk_while_s f s = junk_while_s_rec s.node f s + +let rec junk_available_rec node s = + if node == !(s.last) + then + let thread = feed s in + match Lwt.state thread with + | Lwt.Return _ -> junk_available_rec node s + | Lwt.Fail exn -> raise exn + | Lwt.Sleep -> () + else + match node.data with + | Some _ -> + consume s node; + junk_available_rec node.next s + | None -> () + +let junk_available s = junk_available_rec s.node s +let junk_old s = Lwt.return (junk_available s) + +let rec get_available_rec node acc s = + if node == !(s.last) + then + let thread = feed s in + match Lwt.state thread with + | Lwt.Return _ -> get_available_rec node acc s + | Lwt.Fail exn -> raise exn + | Lwt.Sleep -> List.rev acc + else + match node.data with + | Some x -> + consume s node; + get_available_rec node.next (x :: acc) s + | None -> List.rev acc + +let get_available s = get_available_rec s.node [] s + +let rec get_available_up_to_rec node acc n s = + if n <= 0 + then List.rev acc + else if node == !(s.last) + then + let thread = feed s in + match Lwt.state thread with + | Lwt.Return _ -> get_available_up_to_rec node acc n s + | Lwt.Fail exn -> raise exn + | Lwt.Sleep -> List.rev acc + else + match s.node.data with + | Some x -> + consume s node; + get_available_up_to_rec node.next (x :: acc) (n - 1) s + | None -> List.rev acc + +let get_available_up_to n s = get_available_up_to_rec s.node [] n s + +let rec is_empty s = + if s.node == !(s.last) + then feed s >>= fun () -> is_empty s + else Lwt.return (s.node.data = None) + +let map f s = + from (fun () -> + get s >|= function + | Some x -> + let x = f x in + Some x + | None -> None) + +let map_s f s = + from (fun () -> + get s >>= function + | Some x -> f x >|= fun x -> Some x + | None -> Lwt.return_none) + +let filter f s = + let rec next () = + let t = get s in + t >>= function + | Some x -> + let test = f x in + if test then t else next () + | None -> Lwt.return_none + in + from next + +let filter_s f s = + let rec next () = + let t = get s in + t >>= function + | Some x -> ( f x >>= function true -> t | false -> next ()) + | None -> t + in + from next + +let filter_map f s = + let rec next () = + get s >>= function + | Some x -> ( + let x = f x in + match x with Some _ -> Lwt.return x | None -> next ()) + | None -> Lwt.return_none + in + from next + +let filter_map_s f s = + let rec next () = + get s >>= function + | Some x -> ( + let t = f x in + t >>= function Some _ -> t | None -> next ()) + | None -> Lwt.return_none + in + from next + +let map_list f s = + let pendings = ref [] in + let rec next () = + match !pendings with + | [] -> ( + get s >>= function + | Some x -> + let l = f x in + pendings := l; + next () + | None -> Lwt.return_none) + | x :: l -> + pendings := l; + Lwt.return (Some x) + in + from next + +let map_list_s f s = + let pendings = ref [] in + let rec next () = + match !pendings with + | [] -> ( + get s >>= function + | Some x -> + f x >>= fun l -> + pendings := l; + next () + | None -> Lwt.return_none) + | x :: l -> + pendings := l; + Lwt.return (Some x) + in + from next + +let flatten s = map_list (fun l -> l) s + +let rec fold_rec node f s acc = + if node == !(s.last) + then feed s >>= fun () -> fold_rec node f s acc + else + match node.data with + | Some x -> + consume s node; + let acc = f x acc in + fold_rec node.next f s acc + | None -> Lwt.return acc + +let fold f s acc = fold_rec s.node f s acc + +let rec fold_s_rec node f s acc = + if node == !(s.last) + then feed s >>= fun () -> fold_s_rec node f s acc + else + match node.data with + | Some x -> + consume s node; + f x acc >>= fun acc -> fold_s_rec node.next f s acc + | None -> Lwt.return acc + +let fold_s f s acc = fold_s_rec s.node f s acc + +let rec iter_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> iter_rec node f s + else + match node.data with + | Some x -> + consume s node; + let () = f x in + iter_rec node.next f s + | None -> Lwt.return_unit + +let iter f s = iter_rec s.node f s + +let rec iter_s_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> iter_s_rec node f s + else + match node.data with + | Some x -> + consume s node; + f x >>= fun () -> iter_s_rec node.next f s + | None -> Lwt.return_unit + +let iter_s f s = iter_s_rec s.node f s + +let rec iter_p_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> iter_p_rec node f s + else + match node.data with + | Some x -> + consume s node; + let res = f x in + let rest = iter_p_rec node.next f s in + res >>= fun () -> rest + | None -> Lwt.return_unit + +let iter_p f s = iter_p_rec s.node f s + +let iter_n ?(max_concurrency = 1) f stream = + (if max_concurrency <= 0 + then + let message = + Printf.sprintf "Lwt_stream.iter_n: max_concurrency must be > 0, %d given" + max_concurrency + in + invalid_arg message); + let rec loop running available = + (if available > 0 + then Lwt.return (running, available) + else + Lwt.nchoose_split running >>= fun (complete, running) -> + Lwt.return (running, available + List.length complete)) + >>= fun (running, available) -> + get stream >>= function + | None -> Lwt.join running + | Some elt -> loop (f elt :: running) (pred available) + in + loop [] max_concurrency + +let rec find_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> find_rec node f s + else + match node.data with + | Some x as opt -> + consume s node; + let test = f x in + if test then Lwt.return opt else find_rec node.next f s + | None -> Lwt.return_none + +let find f s = find_rec s.node f s + +let rec find_s_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> find_s_rec node f s + else + match node.data with + | Some x as opt -> ( + consume s node; + f x >>= function + | true -> Lwt.return opt + | false -> find_s_rec node.next f s) + | None -> Lwt.return_none + +let find_s f s = find_s_rec s.node f s + +let rec find_map_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> find_map_rec node f s + else + match node.data with + | Some x -> + consume s node; + let x = f x in + if x = None then find_map_rec node.next f s else Lwt.return x + | None -> Lwt.return_none + +let find_map f s = find_map_rec s.node f s + +let rec find_map_s_rec node f s = + if node == !(s.last) + then feed s >>= fun () -> find_map_s_rec node f s + else + match node.data with + | Some x -> ( + consume s node; + let t = f x in + t >>= function None -> find_map_s_rec node.next f s | Some _ -> t) + | None -> Lwt.return_none + +let find_map_s f s = find_map_s_rec s.node f s + +let combine s1 s2 = + let next () = + let t1 = get s1 and t2 = get s2 in + t1 >>= fun n1 -> + t2 >>= fun n2 -> + match n1, n2 with + | Some x1, Some x2 -> Lwt.return (Some (x1, x2)) + | _ -> Lwt.return_none + in + from next + +let append s1 s2 = + let current_s = ref s1 in + let rec next () = + let t = get !current_s in + t >>= function + | Some _ -> t + | None -> + if !current_s == s2 + then Lwt.return_none + else ( + current_s := s2; + next ()) + in + from next + +let concat s_top = + let current_s = ref (from (fun () -> Lwt.return_none)) in + let rec next () = + let t = get !current_s in + t >>= function + | Some _ -> t + | None -> ( + get s_top >>= function + | Some s -> + current_s := s; + next () + | None -> Lwt.return_none) + in + from next + +let choose streams = + let source s = s, get s >|= fun x -> s, x in + let streams = ref (List.map source streams) in + let rec next () = + match !streams with + | [] -> Lwt.return_none + | l -> ( + Lwt.choose (List.map snd l) >>= fun (s, x) -> + let l = List.remove_assq s l in + match x with + | Some _ -> + streams := source s :: l; + Lwt.return x + | None -> + streams := l; + next ()) + in + from next + +let parse s f = + (match s.source with + | Push_bounded _ -> invalid_arg "Lwt_stream.parse" + | From _ | From_direct _ | Push _ -> ()); + let node = s.node in + Lwt.catch + (fun () -> f s) + (fun exn -> + s.node <- node; + Lwt.reraise exn) + +let hexdump stream = + let buf = Buffer.create 80 and num = ref 0 in + from (fun _ -> + nget 16 stream >>= function + | [] -> Lwt.return_none + | l -> + Buffer.clear buf; + Printf.bprintf buf "%08x| " !num; + num := !num + 16; + let rec bytes pos = function + | [] -> blanks pos + | x :: l -> + if pos = 8 then Buffer.add_char buf ' '; + Printf.bprintf buf "%02x " (Char.code x); + bytes (pos + 1) l + and blanks pos = + if pos < 16 + then ( + if pos = 8 + then Buffer.add_string buf " " + else Buffer.add_string buf " "; + blanks (pos + 1)) + in + bytes 0 l; + Buffer.add_string buf " |"; + List.iter + (fun ch -> + Buffer.add_char buf + (if ch >= '\x20' && ch <= '\x7e' then ch else '.')) + l; + Buffer.add_char buf '|'; + Lwt.return (Some (Buffer.contents buf))) diff --git a/src/lib/eliom_stream.shared.mli b/src/lib/eliom_stream.shared.mli new file mode 100644 index 0000000000..6a4c632e08 --- /dev/null +++ b/src/lib/eliom_stream.shared.mli @@ -0,0 +1,400 @@ +(* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *) + +(** Data streams *) + +type 'a t +(** A stream holding values of type ['a]. + + Naming convention: in this module, all functions applying a function + to each element of a stream are suffixed by: + + - [_s] when the function returns a thread and calls are serialised + - [_p] when the function returns a thread and calls are parallelised *) + +(** {2 Construction} *) + +val from : (unit -> 'a option Lwt.t) -> 'a t +(** [from f] creates a stream from the given input function. [f] is + called each time more input is needed, and the stream ends when + [f] returns [None]. + + If [f], or the thread produced by [f], raises an exception, that exception + is forwarded to the consumer of the stream (for example, a caller of + {!get}). Note that this does not end the stream. A subsequent attempt to + read from the stream will cause another call to [f], which may succeed + with a value. *) + +val from_direct : (unit -> 'a option) -> 'a t +(** [from_direct f] does the same as {!from} but with a function + that does not return a thread. It is preferred that this + function be used rather than wrapping [f] into a function which + returns a thread. + + The behavior when [f] raises an exception is the same as for {!from}, + except that [f] does not produce a thread. *) + +exception Closed +(** Exception raised by the push function of a push-stream when + pushing an element after the end of stream ([= None]) has been + pushed. *) + +val create : unit -> 'a t * ('a option -> unit) +(** [create ()] returns a new stream and a push function. + + To notify the stream's consumer of errors, either use a separate + communication channel, or use a {!Stdlib.result} stream. There is + no way to push an exception into a push-stream. *) + +val create_with_reference : unit -> 'a t * ('a option -> unit) * ('b -> unit) +(** [create_with_reference ()] returns a new stream and a push + function. The last function allows a reference to be set to an + external source. This prevents the external source from being + garbage collected. + + For example, to convert a reactive event to a stream: + + {[ + let stream, push, set_ref = Lwt_stream.create_with_reference () in + set_ref (map_event push event) + ]} +*) + +exception Full +(** Exception raised by the push function of a bounded push-stream + when the stream queue is full and a thread is already waiting to + push an element. *) + +(** Type of sources for bounded push-streams. *) +class type ['a] bounded_push = object + method size : int + (** Size of the stream. *) + + method resize : int -> unit + (** Change the size of the stream queue. Note that the new size + can smaller than the current stream queue size. + + It raises {!Stdlib.Invalid_argument} if [size < 0]. *) + + method push : 'a -> unit Lwt.t + (** Pushes a new element to the stream. If the stream is full then + it will block until one element is consumed. If another thread + is already blocked on [push], it raises {!Lwt_stream.Full}. *) + + method close : unit + (** Closes the stream. Any thread currently blocked on a call to + the [push] method fails with {!Lwt_stream.Closed}. *) + + method count : int + (** Number of elements in the stream queue. *) + + method blocked : bool + (** Is a thread is blocked on a call to the [push] method? *) + + method closed : bool + (** Is the stream closed? *) + + method set_reference : 'a. 'a -> unit + (** Set the reference to an external source. *) +end + +val create_bounded : int -> 'a t * 'a bounded_push +(** [create_bounded size] returns a new stream and a bounded push + source. The stream can hold a maximum of [size] elements. When + this limit is reached, pushing a new element will block until + one is consumed. + + Note that you cannot clone or parse (with {!parse}) a bounded + stream. These functions will raise [Invalid_argument] if you try + to do so. + + It raises [Invalid_argument] if [size < 0]. *) + +val return : 'a -> 'a t +(** [return a] creates a stream containing the value [a] and being immediately + closed stream (in the sense of {!is_closed}). + + @since 5.5.0 *) + +val return_lwt : 'a Lwt.t -> 'a t +(** [return_lwt l] creates a stream returning the value that [l] resolves to. + The value is pushed into the stream immediately after the promise becomes + resolved and the stream is then immediately closed (in the sense of + {!is_closed}). + + If, instead, [l] becomes rejected, then the stream is closed without any + elements in it. Attempting to fetch elements from it will raise {!Empty}. + + @since 5.5.0 *) + +val of_seq : 'a Seq.t -> 'a t +(** [of_seq s] creates a stream returning all elements of [s]. The elements are + evaluated from [s] and pushed onto the stream as the stream is consumed. + + @since 4.2.0 *) + +val of_lwt_seq : 'a Lwt_seq.t -> 'a t +(** [of_lwt_seq s] creates a stream returning all elements of [s]. The elements + are evaluated from [s] and pushed onto the stream as the stream is consumed. + + @since 5.5.0 *) + +val of_list : 'a list -> 'a t +(** [of_list l] creates a stream returning all elements of [l]. The elements are + pushed into the stream immediately, resulting in a closed stream (in the + sense of {!is_closed}). *) + +val of_array : 'a array -> 'a t +(** [of_array a] creates a stream returning all elements of [a]. The elements + are pushed into the stream immediately, resulting in a closed stream (in the + sense of {!is_closed}). *) + +val of_string : string -> char t +(** [of_string str] creates a stream returning all characters of [str]. The + characters are pushed into the stream immediately, resulting in a closed + stream (in the sense of {!is_closed}). *) + +val clone : 'a t -> 'a t +(** [clone st] clone the given stream. Operations on each stream + will not affect the other. + + For example: + + {[ + # let st1 = Lwt_stream.of_list [1; 2; 3];; + val st1 : int Lwt_stream.t = + # let st2 = Lwt_stream.clone st1;; + val st2 : int Lwt_stream.t = + # lwt x = Lwt_stream.next st1;; + val x : int = 1 + # lwt y = Lwt_stream.next st2;; + val y : int = 1 + ]} + + It raises [Invalid_argument] if [st] is a bounded + push-stream. *) + +(** {2 Destruction} *) + +val to_list : 'a t -> 'a list Lwt.t +(** Returns the list of elements of the given stream *) + +val to_string : char t -> string Lwt.t +(** Returns the word composed of all characters of the given + stream *) + +(** {2 Data retrieval} *) + +exception Empty +(** Exception raised when trying to retrieve data from an empty + stream. *) + +val peek : 'a t -> 'a option Lwt.t +(** [peek st] returns the first element of the stream, if any, + without removing it. *) + +val npeek : int -> 'a t -> 'a list Lwt.t +(** [npeek n st] returns at most the first [n] elements of [st], + without removing them. *) + +val get : 'a t -> 'a option Lwt.t +(** [get st] removes and returns the first element of the stream, if + any. *) + +val nget : int -> 'a t -> 'a list Lwt.t +(** [nget n st] removes and returns at most the first [n] elements of + [st]. *) + +val get_while : ('a -> bool) -> 'a t -> 'a list Lwt.t + +val get_while_s : ('a -> bool Lwt.t) -> 'a t -> 'a list Lwt.t +(** [get_while f st] returns the longest prefix of [st] where all + elements satisfy [f]. *) + +val next : 'a t -> 'a Lwt.t +(** [next st] removes and returns the next element of the stream or + fails with {!Empty}, if the stream is empty. *) + +val last_new : 'a t -> 'a Lwt.t +(** [last_new st] returns the last element that can be obtained + without sleeping, or wait for one if none is available. + + It fails with {!Empty} if the stream has no more elements. *) + +val junk : 'a t -> unit Lwt.t +(** [junk st] removes the first element of [st]. *) + +val njunk : int -> 'a t -> unit Lwt.t +(** [njunk n st] removes at most the first [n] elements of the + stream. *) + +val junk_while : ('a -> bool) -> 'a t -> unit Lwt.t + +val junk_while_s : ('a -> bool Lwt.t) -> 'a t -> unit Lwt.t +(** [junk_while f st] removes all elements at the beginning of the + streams which satisfy [f]. *) + +val junk_available : 'a t -> unit +(** [junk_available st] removes all elements that are ready to be read + without yielding from [st]. *) + +val get_available : 'a t -> 'a list +(** [get_available st] returns all available elements of [l] without + blocking. *) + +val get_available_up_to : int -> 'a t -> 'a list +(** [get_available_up_to n st] returns up to [n] elements of [l] + without blocking. *) + +val is_empty : 'a t -> bool Lwt.t +(** [is_empty st] returns whether the given stream is empty. *) + +val is_closed : 'a t -> bool +(** [is_closed st] returns whether the given stream has been closed. A closed + stream is not necessarily empty. It may still contain unread elements. If + [is_closed s = true], then all subsequent reads until the end of the + stream are guaranteed not to block. + + @since 2.6.0 *) + +val closed : 'a t -> unit Lwt.t +(** [closed st] returns a thread that will sleep until the stream has been + closed. + + @since 2.6.0 *) + +(** {3 Deprecated} *) + +val junk_old : 'a t -> unit Lwt.t +[@@deprecated "Use junk_available instead"] +(** @deprecated [junk_old st] is [Lwt.return (junk_available st)]. *) + +(** {2 Stream transversal} *) + +(** Note: all the following functions are destructive. + + For example: + + {[ + # let st1 = Lwt_stream.of_list [1; 2; 3];; + val st1 : int Lwt_stream.t = + # let st2 = Lwt_stream.map string_of_int st1;; + val st2 : string Lwt_stream.t = + # lwt x = Lwt_stream.next st1;; + val x : int = 1 + # lwt y = Lwt_stream.next st2;; + val y : string = "2" + ]} +*) + +val choose : 'a t list -> 'a t +(** [choose l] creates an stream from a list of streams. The + resulting stream will return elements returned by any stream of + [l] in an unspecified order. *) + +val map : ('a -> 'b) -> 'a t -> 'b t + +val map_s : ('a -> 'b Lwt.t) -> 'a t -> 'b t +(** [map f st] maps the value returned by [st] with [f] *) + +val filter : ('a -> bool) -> 'a t -> 'a t + +val filter_s : ('a -> bool Lwt.t) -> 'a t -> 'a t +(** [filter f st] keeps only values, [x], such that [f x] is [true] *) + +val filter_map : ('a -> 'b option) -> 'a t -> 'b t + +val filter_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b t +(** [filter_map f st] filter and map [st] at the same time *) + +val map_list : ('a -> 'b list) -> 'a t -> 'b t + +val map_list_s : ('a -> 'b list Lwt.t) -> 'a t -> 'b t +(** [map_list f st] applies [f] on each element of [st] and flattens + the lists returned *) + +val fold : ('a -> 'b -> 'b) -> 'a t -> 'b -> 'b Lwt.t + +val fold_s : ('a -> 'b -> 'b Lwt.t) -> 'a t -> 'b -> 'b Lwt.t +(** [fold f s x] fold_like function for streams. *) + +val iter : ('a -> unit) -> 'a t -> unit Lwt.t +val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + +val iter_s : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t +(** [iter f s] iterates over all elements of the stream. *) + +val iter_n : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t +(** [iter_n ?max_concurrency f s] iterates over all elements of the stream [s]. + Iteration is performed concurrently with up to [max_threads] concurrent + instances of [f]. + + Iteration is {b not} guaranteed to be in order as this function will + attempt to always process [max_concurrency] elements from [s] at once. + + @param max_concurrency defaults to [1]. + @raise Invalid_argument if [max_concurrency < 1]. + @since 3.3.0 *) + +val find : ('a -> bool) -> 'a t -> 'a option Lwt.t + +val find_s : ('a -> bool Lwt.t) -> 'a t -> 'a option Lwt.t +(** [find f s] find an element in a stream. *) + +val find_map : ('a -> 'b option) -> 'a t -> 'b option Lwt.t + +val find_map_s : ('a -> 'b option Lwt.t) -> 'a t -> 'b option Lwt.t +(** [find_map f s] find and map at the same time. *) + +val combine : 'a t -> 'b t -> ('a * 'b) t +(** [combine s1 s2] combines two streams. The stream will end when + either stream ends. *) + +val append : 'a t -> 'a t -> 'a t +(** [append s1 s2] returns a stream which returns all elements of + [s1], then all elements of [s2] *) + +val concat : 'a t t -> 'a t +(** [concat st] returns the concatenation of all streams of [st]. *) + +val flatten : 'a list t -> 'a t +(** [flatten st = map_list (fun l -> l) st] *) + +val wrap_exn : 'a t -> ('a, exn) result t +(** [wrap_exn s] is a stream [s'] such that each time [s] yields a value [v], + [s'] yields [Result.Ok v], and when the source of [s] raises an exception + [e], [s'] yields [Result.Error e]. + + Note that push-streams (as returned by {!create}) never raise exceptions. + + If the stream source keeps raising the same exception [e] each time the + stream is read, [s'] is unbounded. Reading it will produce [Result.Error e] + indefinitely. + + @since 2.7.0 *) + +(** {2 Parsing} *) + +val parse : 'a t -> ('a t -> 'b Lwt.t) -> 'b Lwt.t +(** [parse st f] parses [st] with [f]. If [f] raise an exception, + [st] is restored to its previous state. + + It raises [Invalid_argument] if [st] is a bounded + push-stream. *) + +(** {2 Misc} *) + +val hexdump : char t -> string t +(** [hexdump byte_stream] returns a stream which is the same as the + output of [hexdump -C]. + + Basically, here is a simple implementation of [hexdump -C]: + + {[ + let () = Lwt_main.run begin + Lwt_io.write_lines + Lwt_io.stdout + (Lwt_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin)) + end + ]} +*) From 4e49c2a3719a91a05c8c5fb55ec60b50035abf67 Mon Sep 17 00:00:00 2001 From: Vincent Balat Date: Wed, 27 Aug 2025 15:28:31 +0200 Subject: [PATCH 02/21] ocamlformating --- src/lib/eliom_client_main.eliom | 2 +- src/lib/eliom_comet.server.ml | 3 +- src/lib/eliom_content.eliom | 4 +- src/lib/eliom_cscache.eliom | 1 - src/lib/eliom_form.eliom | 180 +++++++++++++++++++++++----- src/lib/eliom_form.eliomi | 2 +- src/lib/eliom_service_base.eliom | 43 ++++--- src/lib/eliom_shared.eliom | 140 ++++++++++++++-------- src/lib/eliom_shared_content.eliom | 4 +- src/lib/eliom_shared_content.eliomi | 2 +- src/lib/eliom_tools.eliom | 19 ++- src/lib/eliom_tools.eliomi | 2 +- 12 files changed, 294 insertions(+), 108 deletions(-) diff --git a/src/lib/eliom_client_main.eliom b/src/lib/eliom_client_main.eliom index 3328cebfc9..6049674d32 100644 --- a/src/lib/eliom_client_main.eliom +++ b/src/lib/eliom_client_main.eliom @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) let%client _ = Eliom_client.init () diff --git a/src/lib/eliom_comet.server.ml b/src/lib/eliom_comet.server.ml index 73e890c6c9..930dc50de9 100644 --- a/src/lib/eliom_comet.server.ml +++ b/src/lib/eliom_comet.server.ml @@ -598,7 +598,8 @@ end = struct try Hashtbl.find handler_ref_table scope_hierarchy with Not_found -> let eref = - Eliom_reference.Volatile.eref ~scope:(`Client_process scope_hierarchy) None + Eliom_reference.Volatile.eref ~scope:(`Client_process scope_hierarchy) + None in Hashtbl.add handler_ref_table scope_hierarchy eref; eref diff --git a/src/lib/eliom_content.eliom b/src/lib/eliom_content.eliom index 08bc84fb17..249a620540 100644 --- a/src/lib/eliom_content.eliom +++ b/src/lib/eliom_content.eliom @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) [%%shared type boxed] @@ -55,7 +55,7 @@ module Svg = struct let client_boxed = ~%client_boxed in let real = Svg.To_dom.of_element (unboxed client_boxed) in Js.Opt.iter dummy_dom##.parentNode (fun parent -> - parent ## (replaceChild real dummy_dom)) + parent##(replaceChild real dummy_dom)) : unit)] in init diff --git a/src/lib/eliom_cscache.eliom b/src/lib/eliom_cscache.eliom index 5a7287b3fc..1262fb0899 100644 --- a/src/lib/eliom_cscache.eliom +++ b/src/lib/eliom_cscache.eliom @@ -37,7 +37,6 @@ let%server find cache get_data id = with Not_found -> let th = let* v = get_data id in - ignore [%client.unsafe (do_cache ~%cache ~%id ~%v : unit)]; Lwt.return v in diff --git a/src/lib/eliom_form.eliom b/src/lib/eliom_form.eliom index 5d75aee819..b15d39df86 100644 --- a/src/lib/eliom_form.eliom +++ b/src/lib/eliom_form.eliom @@ -16,10 +16,12 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) open%shared Js_of_ocaml + [%%client.start] + open Lwt.Syntax let read_params form y = @@ -100,8 +102,17 @@ let get_xhr = function module Make_links (Html : Html) = struct type +'a attrib = 'a Html.attrib - let make_uri ?absolute ?absolute_path ?https ~service ?hostname ?port - ?fragment ?keep_nl_params ?nl_params gp + let make_uri + ?absolute + ?absolute_path + ?https + ~service + ?hostname + ?port + ?fragment + ?keep_nl_params + ?nl_params + gp = Html.uri_of_fun @@ fun () -> Eliom_uri.make_string_uri ?absolute ?absolute_path ?https ?fragment ~service @@ -109,8 +120,20 @@ module Make_links (Html : Html) = struct let uri_of_string = Html.uri_of_fun - let a ?absolute ?absolute_path ?https ?(a = []) ~service ?hostname ?port - ?fragment ?keep_nl_params ?nl_params ?xhr content getparams + let a + ?absolute + ?absolute_path + ?https + ?(a = []) + ~service + ?hostname + ?port + ?fragment + ?keep_nl_params + ?nl_params + ?xhr + content + getparams = let a = let a = (a :> Html_types.a_attrib attrib list) in @@ -230,9 +253,20 @@ module Make (Html : Html) = struct let make_post_uri_components = Eliom_uri.make_post_uri_components - let get_form_ bind return ?absolute ?absolute_path ?https ?a ~service - ?hostname ?port ?fragment - ?(nl_params = Eliom_parameter.empty_nl_params_set) ?keep_nl_params f + let get_form_ + bind + return + ?absolute + ?absolute_path + ?https + ?a + ~service + ?hostname + ?port + ?fragment + ?(nl_params = Eliom_parameter.empty_nl_params_set) + ?keep_nl_params + f = let issuffix, paramnames = Eliom_parameter.make_params_names (Eliom_service.get_params_type service) @@ -275,8 +309,19 @@ module Make (Html : Html) = struct in return (Html.lazy_form ~a inside) - let get_form ?absolute ?absolute_path ?https ?a ~service ?hostname ?port - ?fragment ?keep_nl_params ?nl_params ?xhr:_ f + let get_form + ?absolute + ?absolute_path + ?https + ?a + ~service + ?hostname + ?port + ?fragment + ?keep_nl_params + ?nl_params + ?xhr:_ + f = get_form_ (fun x f -> f x) @@ -284,11 +329,22 @@ module Make (Html : Html) = struct ?absolute ?absolute_path ?https ?a ~service ?keep_nl_params ?nl_params ?hostname ?port ?fragment f - let post_form_ bind return ?absolute ?absolute_path ?https ?a ~service - ?hostname ?port ?fragment - ?(nl_params = Eliom_parameter.empty_nl_params_set) - ?(keep_nl_params : [`All | `Persistent | `None] option) - ?keep_get_na_params f get_params + let post_form_ + bind + return + ?absolute + ?absolute_path + ?https + ?a + ~service + ?hostname + ?port + ?fragment + ?(nl_params = Eliom_parameter.empty_nl_params_set) + ?(keep_nl_params : [`All | `Persistent | `None] option) + ?keep_get_na_params + f + get_params = let _, paramnames = Eliom_parameter.make_params_names (Eliom_service.post_params_type service) @@ -316,9 +372,21 @@ module Make (Html : Html) = struct in return (make_post_form ?a ~action inside) - let post_form ?absolute ?absolute_path ?https ?a ~service ?hostname ?port - ?fragment ?keep_nl_params ?keep_get_na_params ?nl_params ?xhr:_ f - getparams + let post_form + ?absolute + ?absolute_path + ?https + ?a + ~service + ?hostname + ?port + ?fragment + ?keep_nl_params + ?keep_get_na_params + ?nl_params + ?xhr:_ + f + getparams = post_form_ (fun x f -> f x) @@ -402,8 +470,14 @@ module Make (Html : Html) = struct * 'a soption list | Option of 'a soption - let gen_select ?a ?(multiple = false) ?required ~name (fl : 'a select_opt) - (ol : 'a select_opt list) string_of + let gen_select + ?a + ?(multiple = false) + ?required + ~name + (fl : 'a select_opt) + (ol : 'a select_opt list) + string_of = let a = (a :> Html_types.select_attrib attrib list option) in let a = @@ -512,8 +586,19 @@ module Make (Html : Html) = struct let a_onsubmit_service info = Html.attrib_of_service "onsubmit" info - let get_form ?absolute ?absolute_path ?https ?(a = []) ~service ?hostname - ?port ?fragment ?keep_nl_params ?nl_params ?xhr contents + let get_form + ?absolute + ?absolute_path + ?https + ?(a = []) + ~service + ?hostname + ?port + ?fragment + ?keep_nl_params + ?nl_params + ?xhr + contents = let a = let a = (a :> Html_types.form_attrib attrib list) in @@ -529,8 +614,19 @@ module Make (Html : Html) = struct get_form ?absolute ?absolute_path ?https ~a ~service ?hostname ?port ?fragment ?keep_nl_params ?nl_params contents - let lwt_get_form ?absolute ?absolute_path ?https ?(a = []) ~service ?hostname - ?port ?fragment ?keep_nl_params ?nl_params ?xhr contents + let lwt_get_form + ?absolute + ?absolute_path + ?https + ?(a = []) + ~service + ?hostname + ?port + ?fragment + ?keep_nl_params + ?nl_params + ?xhr + contents = let a = let a = (a :> Html_types.form_attrib attrib list) in @@ -546,9 +642,21 @@ module Make (Html : Html) = struct get_form_ Lwt.bind Lwt.return ?absolute ?absolute_path ?https ~a ~service ?hostname ?port ?fragment ?nl_params ?keep_nl_params contents - let post_form ?absolute ?absolute_path ?https ?(a = []) ~service ?hostname - ?port ?fragment ?keep_nl_params ?keep_get_na_params ?nl_params ?xhr - contents getparams + let post_form + ?absolute + ?absolute_path + ?https + ?(a = []) + ~service + ?hostname + ?port + ?fragment + ?keep_nl_params + ?keep_get_na_params + ?nl_params + ?xhr + contents + getparams = let a = let a = (a :> Html_types.form_attrib attrib list) in @@ -566,9 +674,21 @@ module Make (Html : Html) = struct ?fragment ?keep_nl_params ?keep_get_na_params ?nl_params contents getparams - let lwt_post_form ?absolute ?absolute_path ?https ?(a = []) ~service ?hostname - ?port ?fragment ?keep_nl_params ?keep_get_na_params ?nl_params ?xhr - contents getparams + let lwt_post_form + ?absolute + ?absolute_path + ?https + ?(a = []) + ~service + ?hostname + ?port + ?fragment + ?keep_nl_params + ?keep_get_na_params + ?nl_params + ?xhr + contents + getparams = let a = let a = (a :> Html_types.form_attrib attrib list) in diff --git a/src/lib/eliom_form.eliomi b/src/lib/eliom_form.eliomi index 674b562012..9998309c5e 100644 --- a/src/lib/eliom_form.eliomi +++ b/src/lib/eliom_form.eliomi @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) [%%client.start] diff --git a/src/lib/eliom_service_base.eliom b/src/lib/eliom_service_base.eliom index bfe84a5b3e..9e0014e5b7 100644 --- a/src/lib/eliom_service_base.eliom +++ b/src/lib/eliom_service_base.eliom @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) (* Manipulation of services - this code can be use on server or client side. *) @@ -27,18 +27,17 @@ module Url = Eliom_lib.Url type suff = [`WithSuffix | `WithoutSuffix] -let params_of_meth : - type m gp gn pp pn x. - (m, gp, gn, pp, pn, 'tipo, x) meth - -> (gp, 'tipo, gn) params * (pp, [`WithoutSuffix], pn) params +let params_of_meth : type m gp gn pp pn x. + (m, gp, gn, pp, pn, 'tipo, x) meth + -> (gp, 'tipo, gn) params * (pp, [`WithoutSuffix], pn) params = function | Get gp -> gp, Eliom_parameter.unit | Post (gp, pp) -> gp, pp | Put gp -> gp, Eliom_parameter.raw_post_data | Delete gp -> gp, Eliom_parameter.raw_post_data -let which_meth_internal : - type m gp gn pp pn tipo x. (m, gp, gn, pp, pn, tipo, x) meth -> m which_meth +let which_meth_internal : type m gp gn pp pn tipo x. + (m, gp, gn, pp, pn, tipo, x) meth -> m which_meth = function | Get _ -> Get' | Post _ -> Post' @@ -75,7 +74,7 @@ type att = type non_att = { na_name : Eliom_common.na_key_serv ; keep_get_na_params : bool - (* bool is used only for post and means "keep_get_na_params": do we + (* bool is used only for post and means "keep_get_na_params": do we keep GET non-attached parameters in links (if any) (31/12/2007 - experimental - WAS: 'a, but may be removed (was not used)) *) } @@ -123,7 +122,7 @@ type ('get (string * Eliommod_parameters.param) list Eliom_lib.String.Table.t (* non localized parameters *) * (string * Eliommod_parameters.param) list - (* regular parameters *) + (* regular parameters *) ; get_params_type : ('get, 'tipo, 'getnames) Eliom_parameter.params_type ; post_params_type : ('post, [`WithoutSuffix], 'postnames) Eliom_parameter.params_type @@ -140,13 +139,13 @@ type ('get ; (* force https *) keep_nl_params : [`All | `Persistent | `None] ; mutable send_appl_content : send_appl_content - (* XNever when we create the service, then changed at registration + (* XNever when we create the service, then changed at registration :/ *) ; (* If the service has a client-side implementation, we put the generating function here: *) mutable client_fun : ('get -> 'post -> result Lwt.t) option ref Eliom_client_value.t option - (* The function is in a client-side reference, so that it is shared + (* The function is in a client-side reference, so that it is shared by all occurrences of the service sent from the server. For some service, we cannot create the client value immediately; this is done later on using [internal_set_client_fun]. *) @@ -208,8 +207,9 @@ let timeout s = s.timeout let https s = s.https let priority s = s.priority -let internal_set_client_fun ~service - (f : ('get -> 'post -> result Lwt.t) Eliom_client_value.t) +let internal_set_client_fun + ~service + (f : ('get -> 'post -> result Lwt.t) Eliom_client_value.t) = service.client_fun <- Some [%client.unsafe ref (Some ~%f)] @@ -473,9 +473,20 @@ let%client no_client_fun () : _ ref Eliom_client_value.t option = Some (ref None) (** Create a main service (not a coservice), internal or external *) -let main_service ~https ~prefix ~(path : Url.path) ?force_site_dir ~kind ~meth - ?(redirect_suffix = true) ?(keep_nl_params = `None) - ?(priority = default_priority) ~get_params ~post_params ~reload_fun () +let main_service + ~https + ~prefix + ~(path : Url.path) + ?force_site_dir + ~kind + ~meth + ?(redirect_suffix = true) + ?(keep_nl_params = `None) + ?(priority = default_priority) + ~get_params + ~post_params + ~reload_fun + () = { pre_applied_parameters = Eliom_lib.String.Table.empty, [] ; get_params_type = get_params diff --git a/src/lib/eliom_shared.eliom b/src/lib/eliom_shared.eliom index 14f33aef9c..8156139a85 100644 --- a/src/lib/eliom_shared.eliom +++ b/src/lib/eliom_shared.eliom @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) [%%shared open Lwt.Syntax @@ -383,17 +383,22 @@ module React = struct (FakeReact.S.map (Value.local f) (Value.local s)) [%client.unsafe (FakeReact.S.map ?eq:~%eq ~%f ~%s : 'b FakeReact.S.t)] - let fmap ?(eq : ('b -> 'b -> bool) Value.t option) - (f : ('a -> 'b option) Value.t) (i : 'b Value.t) - (s : 'a FakeReact.S.t Value.t) : 'b t + let fmap + ?(eq : ('b -> 'b -> bool) Value.t option) + (f : ('a -> 'b option) Value.t) + (i : 'b Value.t) + (s : 'a FakeReact.S.t Value.t) : 'b t = Value.create (FakeReact.S.fmap (Value.local f) (Value.local i) (Value.local s)) [%client.unsafe (FakeReact.S.fmap ?eq:~%eq ~%f ~%i ~%s : 'b FakeReact.S.t)] - let merge ?eq (f : ('a -> 'b -> 'a) Value.t) (acc : 'a) - (l : 'b FakeReact.S.t Value.t list) : 'a t + let merge + ?eq + (f : ('a -> 'b -> 'a) Value.t) + (acc : 'a) + (l : 'b FakeReact.S.t Value.t list) : 'a t = Value.create (FakeReact.S.merge (Value.local f) acc (List.map Value.local l)) @@ -405,16 +410,22 @@ module React = struct (FakeReact.S.const ~synced:true v) [%client.unsafe (React.S.const ~%v : 'a FakeReact.S.t)] - let l2 ?eq (f : ('a -> 'b -> 'c) Value.t) (s1 : 'a FakeReact.S.t Value.t) - (s2 : 'b FakeReact.S.t Value.t) : 'c t + let l2 + ?eq + (f : ('a -> 'b -> 'c) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) : 'c t = Value.create (FakeReact.S.l2 (Value.local f) (Value.local s1) (Value.local s2)) [%client.unsafe (React.S.l2 ?eq:~%eq ~%f ~%s1 ~%s2 : 'd FakeReact.S.t)] - let l3 ?eq (f : ('a -> 'b -> 'c -> 'd) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) - (s3 : 'c FakeReact.S.t Value.t) : 'd t + let l3 + ?eq + (f : ('a -> 'b -> 'c -> 'd) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) + (s3 : 'c FakeReact.S.t Value.t) : 'd t = Value.create (FakeReact.S.l3 (Value.local f) (Value.local s1) (Value.local s2) @@ -422,9 +433,13 @@ module React = struct [%client.unsafe (React.S.l3 ?eq:~%eq ~%f ~%s1 ~%s2 ~%s3 : 'd FakeReact.S.t)] - let l4 ?eq (f : ('a -> 'b -> 'c -> 'd -> 'e) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) - (s3 : 'c FakeReact.S.t Value.t) (s4 : 'd FakeReact.S.t Value.t) : 'e t + let l4 + ?eq + (f : ('a -> 'b -> 'c -> 'd -> 'e) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) + (s3 : 'c FakeReact.S.t Value.t) + (s4 : 'd FakeReact.S.t Value.t) : 'e t = Value.create (FakeReact.S.l4 (Value.local f) (Value.local s1) (Value.local s2) @@ -432,10 +447,14 @@ module React = struct [%client.unsafe (React.S.l4 ?eq:~%eq ~%f ~%s1 ~%s2 ~%s3 ~%s4 : 'e FakeReact.S.t)] - let l5 ?eq (f : ('a -> 'b -> 'c -> 'd -> 'e -> 'f) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) - (s3 : 'c FakeReact.S.t Value.t) (s4 : 'd FakeReact.S.t Value.t) - (s5 : 'e FakeReact.S.t Value.t) : 'f t + let l5 + ?eq + (f : ('a -> 'b -> 'c -> 'd -> 'e -> 'f) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) + (s3 : 'c FakeReact.S.t Value.t) + (s4 : 'd FakeReact.S.t Value.t) + (s5 : 'e FakeReact.S.t Value.t) : 'f t = Value.create (FakeReact.S.l5 (Value.local f) (Value.local s1) (Value.local s2) @@ -443,10 +462,15 @@ module React = struct [%client.unsafe (React.S.l5 ?eq:~%eq ~%f ~%s1 ~%s2 ~%s3 ~%s4 ~%s5 : 'f FakeReact.S.t)] - let l6 ?eq (f : ('a -> 'b -> 'c -> 'd -> 'e -> 'f -> 'g) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) - (s3 : 'c FakeReact.S.t Value.t) (s4 : 'd FakeReact.S.t Value.t) - (s5 : 'e FakeReact.S.t Value.t) (s6 : 'f FakeReact.S.t Value.t) : 'g t + let l6 + ?eq + (f : ('a -> 'b -> 'c -> 'd -> 'e -> 'f -> 'g) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) + (s3 : 'c FakeReact.S.t Value.t) + (s4 : 'd FakeReact.S.t Value.t) + (s5 : 'e FakeReact.S.t Value.t) + (s6 : 'f FakeReact.S.t Value.t) : 'g t = Value.create (FakeReact.S.l6 (Value.local f) (Value.local s1) (Value.local s2) @@ -472,8 +496,10 @@ module React = struct end module Lwt = struct - let map_s ?eq (f : ('a -> 'b Lwt.t) Value.t) - (s : 'a FakeReact.S.t Value.t) : 'b t Lwt.t + let map_s + ?eq + (f : ('a -> 'b Lwt.t) Value.t) + (s : 'a FakeReact.S.t Value.t) : 'b t Lwt.t = let s' = Value.local s in let* server_result = (Value.local f) (FakeReact.S.value s') in @@ -485,9 +511,11 @@ module React = struct (React.S.Lwt.map_s_init ~init:~%server_result ?eq:~%eq ~%f ~%s : 'b FakeReact.S.t)]) - let l2_s ?eq (f : ('a -> 'b -> 'c Lwt.t) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) : - 'c t Lwt.t + let l2_s + ?eq + (f : ('a -> 'b -> 'c Lwt.t) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) : 'c t Lwt.t = let s1' = Value.local s1 and s2' = Value.local s2 in let* server_result = @@ -502,9 +530,12 @@ module React = struct ~%s2 : 'c FakeReact.S.t)]) - let l3_s ?eq (f : ('a -> 'b -> 'c -> 'd Lwt.t) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) - (s3 : 'c FakeReact.S.t Value.t) : 'd t Lwt.t + let l3_s + ?eq + (f : ('a -> 'b -> 'c -> 'd Lwt.t) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) + (s3 : 'c FakeReact.S.t Value.t) : 'd t Lwt.t = let s1' = Value.local s1 and s2' = Value.local s2 @@ -522,10 +553,13 @@ module React = struct ~%s2 ~%s3 : 'd FakeReact.S.t)]) - let l4_s ?eq (f : ('a -> 'b -> 'c -> 'd -> 'e Lwt.t) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) - (s3 : 'c FakeReact.S.t Value.t) (s4 : 'd FakeReact.S.t Value.t) : - 'e t Lwt.t + let l4_s + ?eq + (f : ('a -> 'b -> 'c -> 'd -> 'e Lwt.t) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) + (s3 : 'c FakeReact.S.t Value.t) + (s4 : 'd FakeReact.S.t Value.t) : 'e t Lwt.t = let s1' = Value.local s1 and s2' = Value.local s2 @@ -546,10 +580,14 @@ module React = struct ~%s2 ~%s3 ~%s4 : 'e FakeReact.S.t)]) - let l5_s ?eq (f : ('a -> 'b -> 'c -> 'd -> 'e -> 'f Lwt.t) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) - (s3 : 'c FakeReact.S.t Value.t) (s4 : 'd FakeReact.S.t Value.t) - (s5 : 'e FakeReact.S.t Value.t) : 'f t Lwt.t + let l5_s + ?eq + (f : ('a -> 'b -> 'c -> 'd -> 'e -> 'f Lwt.t) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) + (s3 : 'c FakeReact.S.t Value.t) + (s4 : 'd FakeReact.S.t Value.t) + (s5 : 'e FakeReact.S.t Value.t) : 'f t Lwt.t = let s1' = Value.local s1 and s2' = Value.local s2 @@ -573,11 +611,15 @@ module React = struct ~%s2 ~%s3 ~%s4 ~%s5 : 'f FakeReact.S.t)]) - let l6_s ?eq (f : ('a -> 'b -> 'c -> 'd -> 'e -> 'f -> 'g Lwt.t) Value.t) - (s1 : 'a FakeReact.S.t Value.t) (s2 : 'b FakeReact.S.t Value.t) - (s3 : 'c FakeReact.S.t Value.t) (s4 : 'd FakeReact.S.t Value.t) - (s5 : 'e FakeReact.S.t Value.t) (s6 : 'f FakeReact.S.t Value.t) : - 'g t Lwt.t + let l6_s + ?eq + (f : ('a -> 'b -> 'c -> 'd -> 'e -> 'f -> 'g Lwt.t) Value.t) + (s1 : 'a FakeReact.S.t Value.t) + (s2 : 'b FakeReact.S.t Value.t) + (s3 : 'c FakeReact.S.t Value.t) + (s4 : 'd FakeReact.S.t Value.t) + (s5 : 'e FakeReact.S.t Value.t) + (s6 : 'f FakeReact.S.t Value.t) : 'g t Lwt.t = let s1' = Value.local s1 and s2' = Value.local s2 @@ -603,8 +645,11 @@ module React = struct ~%s2 ~%s3 ~%s4 ~%s5 ~%s6 : 'g FakeReact.S.t)]) - let merge_s ?eq (f : ('a -> 'b -> 'a Lwt.t) Value.t) (acc : 'a) - (l : 'b FakeReact.S.t Value.t list) : 'a t Lwt.t + let merge_s + ?eq + (f : ('a -> 'b -> 'a Lwt.t) Value.t) + (acc : 'a) + (l : 'b FakeReact.S.t Value.t list) : 'a t Lwt.t = let* server_result, synced = let f (acc, _acc_b) v = @@ -718,8 +763,9 @@ module ReactiveData = struct let synced s = Value.local s |> FakeReactiveData.RList.synced module Lwt = struct - let map_p (f : ('a -> 'b Lwt.t) Value.t) - (l : 'a FakeReactiveData.RList.t Value.t) : 'b t Lwt.t + let map_p + (f : ('a -> 'b Lwt.t) Value.t) + (l : 'a FakeReactiveData.RList.t Value.t) : 'b t Lwt.t = let l' = Value.local l in let* server_result = diff --git a/src/lib/eliom_shared_content.eliom b/src/lib/eliom_shared_content.eliom index 84a48f9ea2..f260f274f7 100644 --- a/src/lib/eliom_shared_content.eliom +++ b/src/lib/eliom_shared_content.eliom @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) open%shared Js_of_ocaml @@ -135,7 +135,7 @@ module Xml = struct Js.Opt.case e##.firstChild (fun () -> Dom.appendChild e - Dom_html.document ## (createTextNode (Js.string x))) + Dom_html.document##(createTextNode (Js.string x))) (fun e -> Dom.CoerceTo.text e >>! fun e -> e##.data := Js.string x) in if not ~%synced then update (React.S.value ~%s); diff --git a/src/lib/eliom_shared_content.eliomi b/src/lib/eliom_shared_content.eliomi index d4ff6555d6..5ed5c7092d 100644 --- a/src/lib/eliom_shared_content.eliomi +++ b/src/lib/eliom_shared_content.eliomi @@ -16,7 +16,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) open Js_of_ocaml diff --git a/src/lib/eliom_tools.eliom b/src/lib/eliom_tools.eliom index 172a2c65c1..d4d84ff657 100644 --- a/src/lib/eliom_tools.eliom +++ b/src/lib/eliom_tools.eliom @@ -14,7 +14,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) [%%shared.start] @@ -349,8 +349,13 @@ module Make (DorF : module type of Eliom_content.Html.F) : HTML5_TOOLS = struct try aux service 0 pages with Not_found -> find_longest_prefix_in_hierarchy service (main, pages) - let hierarchical_menu_depth_first ?(classe = []) ?id ?(whole_tree = false) - ((_page, pages) as the_menu) ?service () + let hierarchical_menu_depth_first + ?(classe = []) + ?id + ?(whole_tree = false) + ((_page, pages) as the_menu) + ?service + () = let rec depth_first_fun pages level pos : [`Ul] elt list = let rec one_item first last i s = @@ -410,8 +415,12 @@ module Make (DorF : module type of Eliom_content.Html.F) : HTML5_TOOLS = struct : [`Ul] elt list :> [> `Ul] elt list) - let hierarchical_menu_breadth_first ?(classe = []) ?id - ((_page, pages) as the_menu) ?service () + let hierarchical_menu_breadth_first + ?(classe = []) + ?id + ((_page, pages) as the_menu) + ?service + () = let rec breadth_first_fun pages level pos : [`Ul] elt list = let rec one_item first last i s = diff --git a/src/lib/eliom_tools.eliomi b/src/lib/eliom_tools.eliomi index a2bb328c1f..aec2fd11f2 100644 --- a/src/lib/eliom_tools.eliomi +++ b/src/lib/eliom_tools.eliomi @@ -14,7 +14,7 @@ * You should have received a copy of the GNU Lesser General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - *) +*) [%%shared.start] From 946d152bb87adc6d9f0adc36993c14d6b0fad11b Mon Sep 17 00:00:00 2001 From: Vincent Balat Date: Tue, 26 Aug 2025 16:34:48 +0200 Subject: [PATCH 03/21] Copy Lwt_react into Eio_react --- src/lib/eio_react.shared.ml | 557 +++++++++++++++++++++++++++++++++++ src/lib/eio_react.shared.mli | 261 ++++++++++++++++ 2 files changed, 818 insertions(+) create mode 100644 src/lib/eio_react.shared.ml create mode 100644 src/lib/eio_react.shared.mli diff --git a/src/lib/eio_react.shared.ml b/src/lib/eio_react.shared.ml new file mode 100644 index 0000000000..cec021132e --- /dev/null +++ b/src/lib/eio_react.shared.ml @@ -0,0 +1,557 @@ +(* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *) + +open Lwt.Infix + +type 'a event = 'a React.event +type 'a signal = 'a React.signal + +module E = struct + include React.E + + (* +---------------------------------------------------------------+ + | Lwt-specific utilities | + +---------------------------------------------------------------+ *) + + let finalise f _ = f () + + let with_finaliser f event = + let r = ref () in + Gc.finalise (finalise f) r; + map + (fun x -> + ignore (Sys.opaque_identity r); + x) + event + + let next ev = + let waiter, wakener = Lwt.task () in + let ev = map (fun x -> Lwt.wakeup wakener x) (once ev) in + Lwt.on_cancel waiter (fun () -> stop ev); + waiter + + let limit f e = + (* Thread which prevents [e] from occurring while it is sleeping *) + let limiter = ref Lwt.return_unit in + (* The occurrence that is delayed until the limiter returns. *) + let delayed = ref None in + (* The resulting event. *) + let event, push = create () in + let iter = + fmap + (fun x -> + if Lwt.is_sleeping !limiter + then ( + (* The limiter is sleeping, we queue the event for later + delivering. *) + match !delayed with + | Some cell -> + (* An occurrence is already queued, replace it. *) + cell := x; + None + | None -> + let cell = ref x in + delayed := Some cell; + Lwt.on_success !limiter (fun () -> + if Lwt.is_sleeping !limiter + then delayed := None + else + let x = !cell in + delayed := None; + limiter := f (); + push x); + None) + else ( + (* Set the limiter for future events. *) + limiter := f (); + (* Send the occurrence now. *) + push x; + None)) + e + in + select [iter; event] + + let cancel_thread t () = Lwt.cancel t + + let from f = + let event, push = create () in + let rec loop () = f () >>= fun x -> push x; loop () in + let t = Lwt.pause () >>= loop in + with_finaliser (cancel_thread t) event + + let to_stream event = + let stream, push, set_ref = Lwt_stream.create_with_reference () in + set_ref (map (fun x -> push (Some x)) event); + stream + + let of_stream stream = + let event, push = create () in + let t = + Lwt.pause () >>= fun () -> + Lwt_stream.iter + (fun v -> + try push v + with exn when Lwt.Exception_filter.run exn -> + !Lwt.async_exception_hook exn) + stream + in + with_finaliser (cancel_thread t) event + + let delay thread = + match Lwt.poll thread with + | Some e -> e + | None -> + let event, send = create () in + Lwt.on_success thread (fun e -> send e; stop event); + switch never event + + let keeped = ref [] + let keep e = keeped := map ignore e :: !keeped + + (* +---------------------------------------------------------------+ + | Event transformations | + +---------------------------------------------------------------+ *) + + let run_p e = + let event, push = create () in + let iter = + fmap + (fun t -> + Lwt.on_success t (fun v -> push v); + None) + e + in + select [iter; event] + + let run_s e = + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun t -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> t)) + (fun v -> push v); + None) + e + in + select [iter; event] + + let map_p f e = + let event, push = create () in + let iter = + fmap + (fun x -> + Lwt.on_success (f x) (fun v -> push v); + None) + e + in + select [iter; event] + + let map_s f e = + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun x -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (fun v -> push v); + None) + e + in + select [iter; event] + + let app_p ef e = + let event, push = create () in + let iter = + fmap + (fun (f, x) -> + Lwt.on_success (f x) (fun v -> push v); + None) + (app (map (fun f x -> f, x) ef) e) + in + select [iter; event] + + let app_s ef e = + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun (f, x) -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (fun v -> push v); + None) + (app (map (fun f x -> f, x) ef) e) + in + select [iter; event] + + let filter_p f e = + let event, push = create () in + let iter = + fmap + (fun x -> + Lwt.on_success (f x) (function true -> push x | false -> ()); + None) + e + in + select [iter; event] + + let filter_s f e = + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun x -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (function true -> push x | false -> ()); + None) + e + in + select [iter; event] + + let fmap_p f e = + let event, push = create () in + let iter = + fmap + (fun x -> + Lwt.on_success (f x) (function Some x -> push x | None -> ()); + None) + e + in + select [iter; event] + + let fmap_s f e = + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun x -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (function Some x -> push x | None -> ()); + None) + e + in + select [iter; event] + + let diff_s f e = + let previous = ref None in + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun x -> + match !previous with + | None -> + previous := Some x; + None + | Some y -> + previous := Some x; + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x y)) + (fun v -> push v); + None) + e + in + select [iter; event] + + let accum_s ef acc = + let acc = ref acc in + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun f -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f !acc)) + (fun x -> + acc := x; + push x); + None) + ef + in + select [iter; event] + + let fold_s f acc e = + let acc = ref acc in + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun x -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f !acc x)) + (fun x -> + acc := x; + push x); + None) + e + in + select [iter; event] + + let rec rev_fold f acc = function + | [] -> Lwt.return acc + | x :: l -> rev_fold f acc l >>= fun acc -> f acc x + + let merge_s f acc el = + let event, push = create () in + let mutex = Lwt_mutex.create () in + let iter = + fmap + (fun l -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> rev_fold f acc l)) + (fun v -> push v); + None) + (merge (fun acc x -> x :: acc) [] el) + in + select [iter; event] +end + +module S = struct + include React.S + + (* +---------------------------------------------------------------+ + | Lwt-specific utilities | + +---------------------------------------------------------------+ *) + + let finalise f _ = f () + + let with_finaliser f signal = + let r = ref () in + Gc.finalise (finalise f) r; + map + (fun x -> + ignore (Sys.opaque_identity r); + x) + signal + + let limit ?eq f s = + (* Thread which prevent [s] to changes while it is sleeping *) + let limiter = ref (f ()) in + (* The occurrence that is delayed until the limiter returns. *) + let delayed = ref None in + (* The resulting event. *) + let event, push = E.create () in + let iter = + E.fmap + (fun x -> + if Lwt.is_sleeping !limiter + then ( + (* The limiter is sleeping, we queue the event for later + delivering. *) + match !delayed with + | Some cell -> + (* An occurrence is already queued, replace it. *) + cell := x; + None + | None -> + let cell = ref x in + delayed := Some cell; + Lwt.on_success !limiter (fun () -> + if Lwt.is_sleeping !limiter + then delayed := None + else + let x = !cell in + delayed := None; + limiter := f (); + push x); + None) + else ( + (* Set the limiter for future events. *) + limiter := f (); + (* Send the occurrence now. *) + push x; + None)) + (changes s) + in + hold ?eq (value s) (E.select [iter; event]) + + let keeped = ref [] + let keep s = keeped := map ignore s :: !keeped + + (* +---------------------------------------------------------------+ + | Signal transformations | + +---------------------------------------------------------------+ *) + + let run_s ?eq s = + let event, push = E.create () in + let mutex = Lwt_mutex.create () in + let iter = + E.fmap + (fun t -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> t)) + (fun v -> push v); + None) + (changes s) + in + Lwt_mutex.with_lock mutex (fun () -> value s) >>= fun x -> + Lwt.return (hold ?eq x (E.select [iter; event])) + + let map_s ?eq f s = + let event, push = E.create () in + let mutex = Lwt_mutex.create () in + let iter = + E.fmap + (fun x -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (fun v -> push v); + None) + (changes s) + in + Lwt_mutex.with_lock mutex (fun () -> f (value s)) >>= fun x -> + Lwt.return (hold ?eq x (E.select [iter; event])) + + let app_s ?eq sf s = + let event, push = E.create () in + let mutex = Lwt_mutex.create () in + let iter = + E.fmap + (fun (f, x) -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (fun v -> push v); + None) + (E.app (E.map (fun f x -> f, x) (changes sf)) (changes s)) + in + Lwt_mutex.with_lock mutex (fun () -> (value sf) (value s)) >>= fun x -> + Lwt.return (hold ?eq x (E.select [iter; event])) + + let filter_s ?eq f i s = + let event, push = E.create () in + let mutex = Lwt_mutex.create () in + let iter = + E.fmap + (fun x -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (function true -> push x | false -> ()); + None) + (changes s) + in + let x = value s in + Lwt_mutex.with_lock mutex (fun () -> f x) >>= function + | true -> Lwt.return (hold ?eq x (E.select [iter; event])) + | false -> Lwt.return (hold ?eq i (E.select [iter; event])) + + let fmap_s ?eq f i s = + let event, push = E.create () in + let mutex = Lwt_mutex.create () in + let iter = + E.fmap + (fun x -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (function Some x -> push x | None -> ()); + None) + (changes s) + in + Lwt_mutex.with_lock mutex (fun () -> f (value s)) >>= function + | Some x -> Lwt.return (hold ?eq x (E.select [iter; event])) + | None -> Lwt.return (hold ?eq i (E.select [iter; event])) + + let diff_s f s = + let previous = ref (value s) in + let event, push = E.create () in + let mutex = Lwt_mutex.create () in + let iter = + E.fmap + (fun x -> + let y = !previous in + previous := x; + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x y)) + (fun v -> push v); + None) + (changes s) + in + E.select [iter; event] + + let sample_s f e s = E.map_s (fun x -> f x (value s)) e + let accum_s ?eq ef i = hold ?eq i (E.accum_s ef i) + let fold_s ?eq f i e = hold ?eq i (E.fold_s f i e) + + let rec rev_fold f acc = function + | [] -> Lwt.return acc + | x :: l -> rev_fold f acc l >>= fun acc -> f acc x + + let merge_s ?eq f acc sl = + let s = merge (fun acc x -> x :: acc) [] sl in + let event, push = E.create () in + let mutex = Lwt_mutex.create () in + let iter = + E.fmap + (fun l -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> rev_fold f acc l)) + (fun v -> push v); + None) + (changes s) + in + Lwt_mutex.with_lock mutex (fun () -> rev_fold f acc (value s)) >>= fun x -> + Lwt.return (hold ?eq x (E.select [iter; event])) + + let l1_s ?eq f s1 = map_s ?eq f s1 + + let l2_s ?eq f s1 s2 = + (* Some details about the use of [fun _ _ -> false] on + https://github.com/ocsigen/lwt/pull/893#pullrequestreview-783083496 *) + map_s ?eq + (fun (x1, x2) -> f x1 x2) + (l2 ~eq:(fun _ _ -> false) (fun x1 x2 -> x1, x2) s1 s2) + + let l3_s ?eq f s1 s2 s3 = + map_s ?eq + (fun (x1, x2, x3) -> f x1 x2 x3) + (l3 ~eq:(fun _ _ -> false) (fun x1 x2 x3 -> x1, x2, x3) s1 s2 s3) + + let l4_s ?eq f s1 s2 s3 s4 = + map_s ?eq + (fun (x1, x2, x3, x4) -> f x1 x2 x3 x4) + (l4 + ~eq:(fun _ _ -> false) + (fun x1 x2 x3 x4 -> x1, x2, x3, x4) + s1 s2 s3 s4) + + let l5_s ?eq f s1 s2 s3 s4 s5 = + map_s ?eq + (fun (x1, x2, x3, x4, x5) -> f x1 x2 x3 x4 x5) + (l5 + ~eq:(fun _ _ -> false) + (fun x1 x2 x3 x4 x5 -> x1, x2, x3, x4, x5) + s1 s2 s3 s4 s5) + + let l6_s ?eq f s1 s2 s3 s4 s5 s6 = + map_s ?eq + (fun (x1, x2, x3, x4, x5, x6) -> f x1 x2 x3 x4 x5 x6) + (l6 + ~eq:(fun _ _ -> false) + (fun x1 x2 x3 x4 x5 x6 -> x1, x2, x3, x4, x5, x6) + s1 s2 s3 s4 s5 s6) + + (* +---------------------------------------------------------------+ + | Monadic interface | + +---------------------------------------------------------------+ *) + + let return = const + + let bind_s ?eq s f = + let event, push = E.create () in + let mutex = Lwt_mutex.create () in + let iter = + E.fmap + (fun x -> + Lwt.on_success + (Lwt_mutex.with_lock mutex (fun () -> f x)) + (fun v -> push v); + None) + (changes s) + in + Lwt_mutex.with_lock mutex (fun () -> f (value s)) >>= fun x -> + Lwt.return (switch ?eq (hold ~eq:( == ) x (E.select [iter; event]))) +end diff --git a/src/lib/eio_react.shared.mli b/src/lib/eio_react.shared.mli new file mode 100644 index 0000000000..ddf47d419c --- /dev/null +++ b/src/lib/eio_react.shared.mli @@ -0,0 +1,261 @@ +(* This file is part of Lwt, released under the MIT license. See LICENSE.md for + details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *) + +(** React utilities *) + +(** This module is an overlay for the [React] module. You can open it + instead of the [React] module in order to get all of [React]'s functions + plus Lwt ones. + + This module is provided by OPAM package [lwt_react]. Link with ocamlfind + package [lwt_react]. *) + +type 'a event = 'a React.event +(** Type of events. *) + +type 'a signal = 'a React.signal +(** Type of signals. *) + +module E : sig + include module type of React.E + + (** {2 Lwt-specific utilities} *) + + val with_finaliser : (unit -> unit) -> 'a event -> 'a event + (** [with_finaliser f e] returns an event [e'] which behave as + [e], except that [f] is called when [e'] is garbage + collected. *) + + val next : 'a event -> 'a Lwt.t + (** [next e] returns the next occurrence of [e]. + + Avoid trying to create an “asynchronous loop” by calling [next e] again in + a callback attached to the promise returned by [next e]: + + - The callback is called within the React update step, so calling [next e] + within it will return a promise that is fulfilled with the same value as + the current occurrence. + - If you instead arrange for the React update step to end (for example, by + calling [Lwt.pause ()] within the callback), multiple React update steps + may occur before the callback calls [next e] again, so some occurrences + can be effectively “lost.” + + To robustly asynchronously process occurrences of [e] in a loop, use + [to_stream e], and repeatedly call {!Lwt_stream.next} on the resulting + stream. *) + + val limit : (unit -> unit Lwt.t) -> 'a event -> 'a event + (** [limit f e] limits the rate of [e] with [f]. + + For example, to limit the rate of an event to 1 per second you + can use: [limit (fun () -> Lwt_unix.sleep 1.0) event]. *) + + val from : (unit -> 'a Lwt.t) -> 'a event + (** [from f] creates an event which occurs each time [f ()] + returns a value. If [f] raises an exception, the event is just + stopped. *) + + val to_stream : 'a event -> 'a Lwt_stream.t + (** Creates a stream holding all values occurring on the given + event *) + + val of_stream : 'a Lwt_stream.t -> 'a event + (** [of_stream stream] creates an event which occurs each time a + value is available on the stream. + + If updating the event causes an exception at any point during the update + step, the exception is passed to [!]{!Lwt.async_exception_hook}, which + terminates the process by default. *) + + val delay : 'a event Lwt.t -> 'a event + (** [delay promise] is an event which does not occur until + [promise] resolves. Then it behaves as the event returned by + [promise]. *) + + val keep : 'a event -> unit + (** [keep e] keeps a reference to [e] so it will never be garbage + collected. *) + + (** {2 Threaded versions of React transformation functions} *) + + (** The following functions behave as their [React] counterpart, + except that they take functions that may yield. + + As usual the [_s] suffix is used when calls are serialized, and + the [_p] suffix is used when they are not. + + Note that [*_p] functions may not preserve event order. *) + + val app_s : ('a -> 'b Lwt.t) event -> 'a event -> 'b event + val app_p : ('a -> 'b Lwt.t) event -> 'a event -> 'b event + val map_s : ('a -> 'b Lwt.t) -> 'a event -> 'b event + val map_p : ('a -> 'b Lwt.t) -> 'a event -> 'b event + val filter_s : ('a -> bool Lwt.t) -> 'a event -> 'a event + val filter_p : ('a -> bool Lwt.t) -> 'a event -> 'a event + val fmap_s : ('a -> 'b option Lwt.t) -> 'a event -> 'b event + val fmap_p : ('a -> 'b option Lwt.t) -> 'a event -> 'b event + val diff_s : ('a -> 'a -> 'b Lwt.t) -> 'a event -> 'b event + val accum_s : ('a -> 'a Lwt.t) event -> 'a -> 'a event + val fold_s : ('a -> 'b -> 'a Lwt.t) -> 'a -> 'b event -> 'a event + val merge_s : ('a -> 'b -> 'a Lwt.t) -> 'a -> 'b event list -> 'a event + val run_s : 'a Lwt.t event -> 'a event + val run_p : 'a Lwt.t event -> 'a event +end + +module S : sig + include module type of React.S + + (** {2 Monadic interface} *) + + val return : 'a -> 'a signal + (** Same as [const]. *) + + val bind : + ?eq:('b -> 'b -> bool) + -> 'a signal + -> ('a -> 'b signal) + -> 'b signal + (** [bind ?eq s f] is initially [f x] where [x] is the current + value of [s]. Each time [s] changes to a new value [y], [bind + signal f] is set to [f y], until the next change of + [signal]. *) + + val bind_s : + ?eq:('b -> 'b -> bool) + -> 'a signal + -> ('a -> 'b signal Lwt.t) + -> 'b signal Lwt.t + (** Same as {!bind} except that [f] returns a promise. Calls to [f] + are serialized. *) + + (** {2 Lwt-specific utilities} *) + + val with_finaliser : (unit -> unit) -> 'a signal -> 'a signal + (** [with_finaliser f s] returns a signal [s'] which behaves as + [s], except that [f] is called when [s'] is garbage + collected. *) + + val limit : + ?eq:('a -> 'a -> bool) + -> (unit -> unit Lwt.t) + -> 'a signal + -> 'a signal + (** [limit f s] limits the rate of [s] update with [f]. + + For example, to limit it to 1 per second, you can use: [limit + (fun () -> Lwt_unix.sleep 1.0) s]. *) + + val keep : 'a signal -> unit + (** [keep s] keeps a reference to [s] so it will never be garbage + collected. *) + + (** {2 Threaded versions of React transformation functions} *) + + (** The following functions behave as their [React] counterpart, + except that they take functions that may yield. + + The [_s] suffix means that calls are serialized. + *) + + val app_s : + ?eq:('b -> 'b -> bool) + -> ('a -> 'b Lwt.t) signal + -> 'a signal + -> 'b signal Lwt.t + + val map_s : + ?eq:('b -> 'b -> bool) + -> ('a -> 'b Lwt.t) + -> 'a signal + -> 'b signal Lwt.t + + val filter_s : + ?eq:('a -> 'a -> bool) + -> ('a -> bool Lwt.t) + -> 'a + -> 'a signal + -> 'a signal Lwt.t + + val fmap_s : + ?eq:('b -> 'b -> bool) + -> ('a -> 'b option Lwt.t) + -> 'b + -> 'a signal + -> 'b signal Lwt.t + + val diff_s : ('a -> 'a -> 'b Lwt.t) -> 'a signal -> 'b event + val sample_s : ('b -> 'a -> 'c Lwt.t) -> 'b event -> 'a signal -> 'c event + + val accum_s : + ?eq:('a -> 'a -> bool) + -> ('a -> 'a Lwt.t) event + -> 'a + -> 'a signal + + val fold_s : + ?eq:('a -> 'a -> bool) + -> ('a -> 'b -> 'a Lwt.t) + -> 'a + -> 'b event + -> 'a signal + + val merge_s : + ?eq:('a -> 'a -> bool) + -> ('a -> 'b -> 'a Lwt.t) + -> 'a + -> 'b signal list + -> 'a signal Lwt.t + + val l1_s : + ?eq:('b -> 'b -> bool) + -> ('a -> 'b Lwt.t) + -> 'a signal + -> 'b signal Lwt.t + + val l2_s : + ?eq:('c -> 'c -> bool) + -> ('a -> 'b -> 'c Lwt.t) + -> 'a signal + -> 'b signal + -> 'c signal Lwt.t + + val l3_s : + ?eq:('d -> 'd -> bool) + -> ('a -> 'b -> 'c -> 'd Lwt.t) + -> 'a signal + -> 'b signal + -> 'c signal + -> 'd signal Lwt.t + + val l4_s : + ?eq:('e -> 'e -> bool) + -> ('a -> 'b -> 'c -> 'd -> 'e Lwt.t) + -> 'a signal + -> 'b signal + -> 'c signal + -> 'd signal + -> 'e signal Lwt.t + + val l5_s : + ?eq:('f -> 'f -> bool) + -> ('a -> 'b -> 'c -> 'd -> 'e -> 'f Lwt.t) + -> 'a signal + -> 'b signal + -> 'c signal + -> 'd signal + -> 'e signal + -> 'f signal Lwt.t + + val l6_s : + ?eq:('g -> 'g -> bool) + -> ('a -> 'b -> 'c -> 'd -> 'e -> 'f -> 'g Lwt.t) + -> 'a signal + -> 'b signal + -> 'c signal + -> 'd signal + -> 'e signal + -> 'f signal + -> 'g signal Lwt.t + + val run_s : ?eq:('a -> 'a -> bool) -> 'a Lwt.t signal -> 'a signal Lwt.t +end From 512fcf7fe02ad713aa46db524fd35934a4e99565 Mon Sep 17 00:00:00 2001 From: Vincent Balat Date: Tue, 26 Aug 2025 17:41:38 +0200 Subject: [PATCH 04/21] Replace syntactically Lwt_stream by Eliom_stream and Lwt_react by Eio_react --- src/lib/client/dune.client | 4 ++++ src/lib/eio_react.shared.ml | 4 ++-- src/lib/eio_react.shared.mli | 6 +++--- src/lib/eliom_bus.client.ml | 10 +++++----- src/lib/eliom_bus.client.mli | 4 ++-- src/lib/eliom_bus.server.ml | 8 ++++---- src/lib/eliom_bus.server.mli | 4 ++-- src/lib/eliom_comet.client.ml | 14 +++++++------- src/lib/eliom_comet.client.mli | 6 +++--- src/lib/eliom_comet.server.ml | 30 +++++++++++++++--------------- src/lib/eliom_comet.server.mli | 8 ++++---- src/lib/eliom_react.client.ml | 6 +++--- src/lib/eliom_react.server.ml | 6 +++--- src/lib/eliom_shared.eliom | 16 ++++++++-------- src/lib/eliom_stream.shared.ml | 11 ++++++----- src/lib/eliom_stream.shared.mli | 32 ++++++++++++++++---------------- src/lib/server/dune.server | 4 ++++ 17 files changed, 91 insertions(+), 82 deletions(-) diff --git a/src/lib/client/dune.client b/src/lib/client/dune.client index 68f8ab59b5..375a16e59f 100644 --- a/src/lib/client/dune.client +++ b/src/lib/client/dune.client @@ -1,3 +1,5 @@ +(rule (copy# ../eio_react.shared.ml eio_react.ml)) +(rule (copy# ../eio_react.shared.mli eio_react.mli)) (rule (copy# ../eliom_bus.client.ml eliom_bus.ml)) (rule (copy# ../eliom_bus.client.mli eliom_bus.mli)) (rule (copy# ../eliom_client.client.ml eliom_client.ml)) @@ -90,6 +92,8 @@ (with-stdout-to %{target} (chdir .. (run ppx_eliom_client --as-pp --intf %{deps}))))) (rule (copy# ../eliom_shared_sigs.shared.mli eliom_shared_sigs.mli)) +(rule (copy# ../eliom_stream.shared.ml eliom_stream.ml)) +(rule (copy# ../eliom_stream.shared.mli eliom_stream.mli)) (rule (target eliom_tools.ml) (deps ../eliom_tools.eliom) (action (with-stdout-to %{target} diff --git a/src/lib/eio_react.shared.ml b/src/lib/eio_react.shared.ml index cec021132e..4a193f2032 100644 --- a/src/lib/eio_react.shared.ml +++ b/src/lib/eio_react.shared.ml @@ -80,7 +80,7 @@ module E = struct with_finaliser (cancel_thread t) event let to_stream event = - let stream, push, set_ref = Lwt_stream.create_with_reference () in + let stream, push, set_ref = Eliom_stream.create_with_reference () in set_ref (map (fun x -> push (Some x)) event); stream @@ -88,7 +88,7 @@ module E = struct let event, push = create () in let t = Lwt.pause () >>= fun () -> - Lwt_stream.iter + Eliom_stream.iter (fun v -> try push v with exn when Lwt.Exception_filter.run exn -> diff --git a/src/lib/eio_react.shared.mli b/src/lib/eio_react.shared.mli index ddf47d419c..614f8e4498 100644 --- a/src/lib/eio_react.shared.mli +++ b/src/lib/eio_react.shared.mli @@ -41,7 +41,7 @@ module E : sig can be effectively “lost.” To robustly asynchronously process occurrences of [e] in a loop, use - [to_stream e], and repeatedly call {!Lwt_stream.next} on the resulting + [to_stream e], and repeatedly call {!Eliom_stream.next} on the resulting stream. *) val limit : (unit -> unit Lwt.t) -> 'a event -> 'a event @@ -55,11 +55,11 @@ module E : sig returns a value. If [f] raises an exception, the event is just stopped. *) - val to_stream : 'a event -> 'a Lwt_stream.t + val to_stream : 'a event -> 'a Eliom_stream.t (** Creates a stream holding all values occurring on the given event *) - val of_stream : 'a Lwt_stream.t -> 'a event + val of_stream : 'a Eliom_stream.t -> 'a event (** [of_stream stream] creates an event which occurs each time a value is available on the stream. diff --git a/src/lib/eliom_bus.client.ml b/src/lib/eliom_bus.client.ml index 2440901be2..6240ce0287 100644 --- a/src/lib/eliom_bus.client.ml +++ b/src/lib/eliom_bus.client.ml @@ -29,7 +29,7 @@ module Ecb = Eliom_comet_base type ('a, 'b) t = { channel : 'b Ecb.wrapped_channel - ; stream : 'b Lwt_stream.t Lazy.t + ; stream : 'b Eliom_stream.t Lazy.t ; queue : 'a Queue.t ; mutable max_size : int ; write : 'a list -> unit Lwt.t @@ -42,7 +42,7 @@ type ('a, 'b) t = let consume (t, u) s = let t' = Lwt.catch - (fun () -> Lwt_stream.iter (fun _ -> ()) s) + (fun () -> Eliom_stream.iter (fun _ -> ()) s) (fun e -> (match Lwt.state t with Lwt.Sleep -> Lwt.wakeup_exn u e | _ -> ()); Lwt.fail e) @@ -50,10 +50,10 @@ let consume (t, u) s = Lwt.choose [Lwt.bind t (fun _ -> Lwt.return_unit); t'] let clone_exn (t, u) s = - let s' = Lwt_stream.clone s in - Lwt_stream.from (fun () -> + let s' = Eliom_stream.clone s in + Eliom_stream.from (fun () -> Lwt.catch - (fun () -> Lwt.choose [Lwt_stream.get s'; t]) + (fun () -> Lwt.choose [Eliom_stream.get s'; t]) (fun e -> (match Lwt.state t with Lwt.Sleep -> Lwt.wakeup_exn u e | _ -> ()); Lwt.fail e)) diff --git a/src/lib/eliom_bus.client.mli b/src/lib/eliom_bus.client.mli index 06950d8396..530c9cf692 100644 --- a/src/lib/eliom_bus.client.mli +++ b/src/lib/eliom_bus.client.mli @@ -25,14 +25,14 @@ type ('a, 'b) t -val stream : ('a, 'b) t -> 'b Lwt_stream.t +val stream : ('a, 'b) t -> 'b Eliom_stream.t (** [stream b] returns the stream of data sent to bus [b]. A new stream is created each time this function is called. Some messages from the bus can be lost if they were sent before the call to [stream]. If you need to receive every message, use original stream instead. *) -val original_stream : ('a, 'b) t -> 'b Lwt_stream.t +val original_stream : ('a, 'b) t -> 'b Eliom_stream.t (** [stream b] returns the stream of data sent to bus [b]. A new stream is created each time this function is called. Every messages sent to the bus after the generation of the page are diff --git a/src/lib/eliom_bus.server.ml b/src/lib/eliom_bus.server.ml index 6ba8b81e20..0e8cae932c 100644 --- a/src/lib/eliom_bus.server.ml +++ b/src/lib/eliom_bus.server.ml @@ -23,7 +23,7 @@ open Lwt.Syntax module Ecb = Eliom_comet_base type ('a, 'b) t = - { stream : 'b Lwt_stream.t + { stream : 'b Eliom_stream.t ; scope : Eliom_comet.Channel.comet_scope ; name : string option ; channel : 'b Eliom_comet.Channel.t option @@ -46,7 +46,7 @@ let internal_wrap (bus : ('a, 'b) t) : | None -> Eliom_comet.Channel.create ~scope:bus.scope ?name:bus.name ?size:bus.size - (Lwt_stream.clone bus.stream) + (Eliom_stream.clone bus.stream) | Some c -> c in (match bus.service_registered with @@ -92,7 +92,7 @@ let deriving_to_list : 'a Deriving_Json.t -> 'a list Deriving_Json.t = let create_filtered ?scope ?name ?size ~filter typ = (*The stream*) - let stream, push = Lwt_stream.create () in + let stream, push = Eliom_stream.create () in let push x = let* y = filter x in push (Some y); Lwt.return_unit @@ -107,7 +107,7 @@ let create_filtered ?scope ?name ?size ~filter typ = | `Site -> Some (Eliom_comet.Channel.create ~scope ?name ?size - (Lwt_stream.clone stream)) + (Eliom_stream.clone stream)) | `Client_process _ -> None in let typ_list = deriving_to_list typ in diff --git a/src/lib/eliom_bus.server.mli b/src/lib/eliom_bus.server.mli index c7debfed13..13dea5e533 100644 --- a/src/lib/eliom_bus.server.mli +++ b/src/lib/eliom_bus.server.mli @@ -64,11 +64,11 @@ val create_filtered : some information, like IP address, or user id. *) -val stream : ('a, 'b) t -> 'b Lwt_stream.t +val stream : ('a, 'b) t -> 'b Eliom_stream.t (** [stream b] returns the stream of data sent to bus [b]. Notice you should not use that function multiple times on the same bus, it will return the same stream. If you want to receive multiple times the - same data, you should copy the stream with [Lwt_stream.clone] *) + same data, you should copy the stream with [Eliom_stream.clone] *) val write : ('a, 'b) t -> 'a -> unit Lwt.t (** [write b x] sends the value [x] on the bus [b]. Every participant, diff --git a/src/lib/eliom_comet.client.ml b/src/lib/eliom_comet.client.ml index 119d65407e..95eca2d551 100644 --- a/src/lib/eliom_comet.client.ml +++ b/src/lib/eliom_comet.client.ml @@ -637,12 +637,12 @@ end type 'a handler = { hd_service_handler : 'a Service_handler.t - ; hd_stream : (string * int option * string Ecb.channel_data) Lwt_stream.t } + ; hd_stream : (string * int option * string Ecb.channel_data) Eliom_stream.t } let handler_stream hd = - Lwt_stream.map_list + Eliom_stream.map_list (fun x -> x) - (Lwt_stream.from (fun () -> + (Eliom_stream.from (fun () -> Lwt.try_bind (fun () -> Service_handler.wait_data hd) (fun s -> Lwt.return_some s) @@ -745,7 +745,7 @@ let check_and_update_position position msg_pos data = let register' hd position (_ : Ecb.comet_service) (chan_id : 'a Ecb.chan_id) = let chan_id = Ecb.string_of_chan_id chan_id in let stream = - Lwt_stream.filter_map_s + Eliom_stream.filter_map_s (function | id, pos, data when id = chan_id && check_and_update_position position pos data -> ( @@ -754,7 +754,7 @@ let register' hd position (_ : Ecb.comet_service) (chan_id : 'a Ecb.chan_id) = | Ecb.Closed -> Lwt.fail Channel_closed | Ecb.Data x -> Lwt.return_some (unmarshal x : 'a)) | _ -> Lwt.return_none) - (Lwt_stream.clone hd.hd_stream) + (Eliom_stream.clone hd.hd_stream) in let protect_and_close t = let t' = Lwt.protected t in @@ -763,7 +763,7 @@ let register' hd position (_ : Ecb.comet_service) (chan_id : 'a Ecb.chan_id) = t' in (* protect the stream from cancels *) - Lwt_stream.from (fun () -> protect_and_close (Lwt_stream.get stream)) + Eliom_stream.from (fun () -> protect_and_close (Eliom_stream.get stream)) let register_stateful ?(wake = true) service chan_id = let hd = get_stateful_hd service in @@ -809,7 +809,7 @@ let is_active () = Hashtbl.fold f stateful_handler_table `Active) module Channel = struct - type 'a t = 'a Lwt_stream.t + type 'a t = 'a Eliom_stream.t end let force_link = () diff --git a/src/lib/eliom_comet.client.mli b/src/lib/eliom_comet.client.mli index bd12ee9876..2258658d68 100644 --- a/src/lib/eliom_comet.client.mli +++ b/src/lib/eliom_comet.client.mli @@ -31,7 +31,7 @@ To stop receiving inputs from a channel, use Lwt.cancel on a thread waiting for data. For instance, if you iterate with - [ let t = Lwt_stream.iter f %channel ] calling [Lwt.cancel t] + [ let t = Eliom_stream.iter f %channel ] calling [Lwt.cancel t] will close the channel. *) exception Channel_full @@ -115,7 +115,7 @@ module Configuration : sig end module Channel : sig - type 'a t = 'a Lwt_stream.t + type 'a t = 'a Eliom_stream.t end (**/**) @@ -123,7 +123,7 @@ end val register : ?wake:bool -> 'a Eliom_comet_base.wrapped_channel - -> 'a Lwt_stream.t + -> 'a Eliom_stream.t (** if wake is false, the registration of the channel won't activate the handling loop ( no request will be sent ). Default is true *) diff --git a/src/lib/eliom_comet.server.ml b/src/lib/eliom_comet.server.ml index 930dc50de9..92c52483b9 100644 --- a/src/lib/eliom_comet.server.ml +++ b/src/lib/eliom_comet.server.ml @@ -90,7 +90,7 @@ let timeout () = module Stateless : sig type channel - val create : ?name:string -> size:int -> string Lwt_stream.t -> channel + val create : ?name:string -> size:int -> string Eliom_stream.t -> channel val get_id : channel -> string val get_service : unit -> Eliom_comet_base.comet_service val get_kind : newest:bool -> channel -> Eliom_comet_base.stateless_kind @@ -152,7 +152,7 @@ end = struct in ignore (Lwt.with_value Eliom_common.sp_key None @@ fun () -> - Lwt_stream.iter_s f stream + Eliom_stream.iter_s f stream : unit Lwt.t) let make_name name = "stateless:" ^ name @@ -313,7 +313,7 @@ module Stateful : sig val create_unlimited : ?scope:Eliom_common.client_process_scope -> ?name:chan_id - -> _ Lwt_stream.t + -> _ Eliom_stream.t -> t val get_id : t -> string @@ -347,7 +347,7 @@ end = struct get garbage collected *) mutable events : Obj.t option } | Stream of - { mutable stream : string Eliom_comet_base.channel_data Lwt_stream.t + { mutable stream : string Eliom_comet_base.channel_data Eliom_stream.t ; mutable waiter : waiter } type handler = @@ -435,7 +435,7 @@ end = struct let stream_waiter s = Lwt.with_value Eliom_common.sp_key None @@ fun () -> Lwt.no_cancel - (let* _ = Lwt_stream.peek s in + (let* _ = Eliom_stream.peek s in Lwt.return `Data) (** read up to [n] messages in the list of streams [streams] without blocking. *) @@ -453,7 +453,7 @@ end = struct | (id, Stream ({stream; _} as s)) :: rem -> let l = Lwt.with_value Eliom_common.sp_key None @@ fun () -> - Lwt_stream.get_available_up_to n stream + Eliom_stream.get_available_up_to n stream in if l <> [] then s.waiter <- stream_waiter stream; take (n - List.length l) (List.rev_map (fun v -> id, v) l @ acc) rem @@ -709,7 +709,7 @@ end = struct Logs.info ~src:section (fun fmt -> fmt "create channel %s" name); let stream = Lwt.with_value Eliom_common.sp_key None @@ fun () -> - Lwt_stream.map (fun x -> Eliom_comet_base.Data (marshal x)) stream + Eliom_stream.map (fun x -> Eliom_comet_base.Data (marshal x)) stream in let channel = Stream {stream; waiter = stream_waiter stream} in if List.mem name handler.hd_registered_chan_id @@ -748,16 +748,16 @@ module Channel : sig ?scope:[< comet_scope] -> ?name:string -> ?size:int - -> 'a Lwt_stream.t + -> 'a Eliom_stream.t -> 'a t val create_unlimited : ?scope:Eliom_common.client_process_scope -> ?name:string - -> 'a Lwt_stream.t + -> 'a Eliom_stream.t -> 'a t - val create_newest : ?name:string -> 'a Lwt_stream.t -> 'a t + val create_newest : ?name:string -> 'a Eliom_stream.t -> 'a t val get_wrapped : 'a t -> 'a Eliom_comet_base.wrapped_channel val external_channel : @@ -813,13 +813,13 @@ end = struct Stateless (Stateless.create ?name ~size ( Lwt.with_value Eliom_common.sp_key None @@ fun () -> - Lwt_stream.map marshal stream )) + Eliom_stream.map marshal stream )) let create_stateless_newest_channel ?name stream = Stateless_newest (Stateless.create ?name ~size:1 ( Lwt.with_value Eliom_common.sp_key None @@ fun () -> - Lwt_stream.map marshal stream )) + Eliom_stream.map marshal stream )) let create_stateful ?scope ?name ?(size = 1000) events = { channel = create_stateful_channel ?scope ?name ~size events @@ -845,13 +845,13 @@ end = struct | None -> create_stateful ?name ~size events | Some (`Client_process _ as scope) -> create_stateful ~scope ?name ~size events - | Some `Site -> create_stateless ?name ~size (Lwt_react.E.to_stream events) + | Some `Site -> create_stateless ?name ~size (Eio_react.E.to_stream events) let create ?scope ?name ?(size = 1000) stream = match scope with - | None -> create_stateful ?name ~size (Lwt_react.E.of_stream stream) + | None -> create_stateful ?name ~size (Eio_react.E.of_stream stream) | Some (`Client_process _ as scope) -> - create_stateful ~scope ?name ~size (Lwt_react.E.of_stream stream) + create_stateful ~scope ?name ~size (Eio_react.E.of_stream stream) | Some `Site -> create_stateless ?name ~size stream let external_channel ?(history = 1) ?(newest = false) ~prefix ~name () = diff --git a/src/lib/eliom_comet.server.mli b/src/lib/eliom_comet.server.mli index 77df666d9b..0c3e8b9c1c 100644 --- a/src/lib/eliom_comet.server.mli +++ b/src/lib/eliom_comet.server.mli @@ -40,7 +40,7 @@ module Channel : sig ?scope:[< comet_scope] -> ?name:string -> ?size:int - -> 'a Lwt_stream.t + -> 'a Eliom_stream.t -> 'a t (** [create s] returns a channel sending the values returned by stream [s]. @@ -68,7 +68,7 @@ module Channel : sig A channel can be used only once on client side. To be able to receive the same data multiple times on client side, use - [create (Lwt_stream.clone s)] every time. + [create (Eliom_stream.clone s)] every time. To enforce the limit on the buffer size, the data is read into [stream] as soon as possible: If you want a channel that reads @@ -87,7 +87,7 @@ module Channel : sig val create_unlimited : ?scope:Eliom_common.client_process_scope -> ?name:string - -> 'a Lwt_stream.t + -> 'a Eliom_stream.t -> 'a t (** [create_unlimited s] creates a channel which does not read immediately on the stream. It is read only when the client @@ -97,7 +97,7 @@ module Channel : sig the stream increases and your clients don't read it, you may have memory leaks. *) - val create_newest : ?name:string -> 'a Lwt_stream.t -> 'a t + val create_newest : ?name:string -> 'a Eliom_stream.t -> 'a t (** [create_newest s] is similar to [create ~scope:Eliom_common.site_scope s] but only the last message is returned to the client. *) diff --git a/src/lib/eliom_react.client.ml b/src/lib/eliom_react.client.ml index 7e111aef95..ba48819212 100644 --- a/src/lib/eliom_react.client.ml +++ b/src/lib/eliom_react.client.ml @@ -22,7 +22,7 @@ open Lwt.Syntax *) (* Module for event unwrapping *) -open Lwt_react +open Eio_react open Lwt.Infix let section = Logs.Src.create "eliom:comet" @@ -50,9 +50,9 @@ module Down = struct (* We want to catch more exceptions here than the usual exceptions caught in Eliom_comet. For example Channel_full. *) (* We transform the stream into a stream with exception: *) - let stream = Lwt_stream.wrap_exn channel in + let stream = Eliom_stream.wrap_exn channel in Lwt.async (fun () -> - Lwt_stream.iter_s + Eliom_stream.iter_s (function | Error exn -> let* () = handle_react_exn ~exn () in diff --git a/src/lib/eliom_react.server.ml b/src/lib/eliom_react.server.ml index f80fe58624..025c1ebb05 100644 --- a/src/lib/eliom_react.server.ml +++ b/src/lib/eliom_react.server.ml @@ -22,7 +22,7 @@ open Lwt.Syntax (* Module for event wrapping and related functions *) -open Lwt_react +open Eio_react module Down = struct type 'a stateful = @@ -140,7 +140,7 @@ module S = struct type 'a stateless = { channel : 'a Eliom_comet.Channel.t - ; stream : 'a Lwt_stream.t + ; stream : 'a Eliom_stream.t ; (* avoid garbage collection *) sl_signal : 'a S.t } [@@warning "-69"] @@ -195,7 +195,7 @@ module S = struct | Some t -> S.limit (fun () -> Lwt_unix.sleep t) s in let store = make_store s in - let stream = Lwt_stream.from (read_store store) in + let stream = Eliom_stream.from (read_store store) in let channel = Eliom_comet.Channel.create_unlimited ?name stream in let value : 'a = S.value s in ( channel diff --git a/src/lib/eliom_shared.eliom b/src/lib/eliom_shared.eliom index 8156139a85..f41155b0ab 100644 --- a/src/lib/eliom_shared.eliom +++ b/src/lib/eliom_shared.eliom @@ -21,7 +21,7 @@ [%%shared open Lwt.Syntax -(* put this in Lwt_react? Find a better name? *) +(* put this in Eio_react? Find a better name? *) let to_signal ~init ?eq (th : 'a React.S.t Lwt.t) : 'a React.S.t = let s, set = React.S.create ?eq init in Lwt.async (fun () -> @@ -80,43 +80,43 @@ module React = struct end module Lwt = struct - let map_s = Lwt_react.S.map_s + let map_s = Eio_react.S.map_s let map_s_init ~init ?eq f s = let th = map_s ?eq f s in to_signal ~init ?eq th - let l2_s = Lwt_react.S.l2_s + let l2_s = Eio_react.S.l2_s let l2_s_init ~init ?eq f s1 s2 = let th = l2_s ?eq f s1 s2 in to_signal ~init ?eq th - let l3_s = Lwt_react.S.l3_s + let l3_s = Eio_react.S.l3_s let l3_s_init ~init ?eq f s1 s2 s3 = let th = l3_s ?eq f s1 s2 s3 in to_signal ~init ?eq th - let l4_s = Lwt_react.S.l4_s + let l4_s = Eio_react.S.l4_s let l4_s_init ~init ?eq f s1 s2 s3 s4 = let th = l4_s ?eq f s1 s2 s3 s4 in to_signal ~init ?eq th - let l5_s = Lwt_react.S.l5_s + let l5_s = Eio_react.S.l5_s let l5_s_init ~init ?eq f s1 s2 s3 s4 s5 = let th = l5_s ?eq f s1 s2 s3 s4 s5 in to_signal ~init ?eq th - let l6_s = Lwt_react.S.l6_s + let l6_s = Eio_react.S.l6_s let l6_s_init ~init ?eq f s1 s2 s3 s4 s5 s6 = let th = l6_s ?eq f s1 s2 s3 s4 s5 s6 in to_signal ~init ?eq th - let merge_s = Lwt_react.S.merge_s + let merge_s = Eio_react.S.merge_s let merge_s_init ~init ?eq f a l = let th = merge_s ?eq f a l in diff --git a/src/lib/eliom_stream.shared.ml b/src/lib/eliom_stream.shared.ml index 780fb67bff..8b207b83d0 100644 --- a/src/lib/eliom_stream.shared.ml +++ b/src/lib/eliom_stream.shared.ml @@ -104,7 +104,7 @@ end pending element. *) let clone s = (match s.source with - | Push_bounded _ -> invalid_arg "Lwt_stream.clone" + | Push_bounded _ -> invalid_arg "Eliom_stream.clone" | From _ | From_direct _ | Push _ -> ()); { source = s.source ; close = s.close @@ -235,7 +235,7 @@ class ['a] bounded_push_impl (info : 'a push_bounded) wakener_cell last close = method size = info.pushb_size method resize size = - if size < 0 then invalid_arg "Lwt_stream.bounded_push#resize"; + if size < 0 then invalid_arg "Eliom_stream.bounded_push#resize"; info.pushb_size <- size; if info.pushb_count < info.pushb_size && info.pushb_pending <> None then ( @@ -310,7 +310,7 @@ class ['a] bounded_push_impl (info : 'a push_bounded) wakener_cell last close = end let create_bounded size = - if size < 0 then invalid_arg "Lwt_stream.create_bounded"; + if size < 0 then invalid_arg "Eliom_stream.create_bounded"; (* Create the source for notifications of new elements. *) let info, wakener_cell = let waiter, wakener = Lwt.wait () in @@ -808,7 +808,8 @@ let iter_n ?(max_concurrency = 1) f stream = (if max_concurrency <= 0 then let message = - Printf.sprintf "Lwt_stream.iter_n: max_concurrency must be > 0, %d given" + Printf.sprintf + "Eliom_stream.iter_n: max_concurrency must be > 0, %d given" max_concurrency in invalid_arg message); @@ -940,7 +941,7 @@ let choose streams = let parse s f = (match s.source with - | Push_bounded _ -> invalid_arg "Lwt_stream.parse" + | Push_bounded _ -> invalid_arg "Eliom_stream.parse" | From _ | From_direct _ | Push _ -> ()); let node = s.node in Lwt.catch diff --git a/src/lib/eliom_stream.shared.mli b/src/lib/eliom_stream.shared.mli index 6a4c632e08..62c55d2393 100644 --- a/src/lib/eliom_stream.shared.mli +++ b/src/lib/eliom_stream.shared.mli @@ -55,7 +55,7 @@ val create_with_reference : unit -> 'a t * ('a option -> unit) * ('b -> unit) For example, to convert a reactive event to a stream: {[ - let stream, push, set_ref = Lwt_stream.create_with_reference () in + let stream, push, set_ref = Eliom_stream.create_with_reference () in set_ref (map_event push event) ]} *) @@ -79,11 +79,11 @@ class type ['a] bounded_push = object method push : 'a -> unit Lwt.t (** Pushes a new element to the stream. If the stream is full then it will block until one element is consumed. If another thread - is already blocked on [push], it raises {!Lwt_stream.Full}. *) + is already blocked on [push], it raises {!Eliom_stream.Full}. *) method close : unit (** Closes the stream. Any thread currently blocked on a call to - the [push] method fails with {!Lwt_stream.Closed}. *) + the [push] method fails with {!Eliom_stream.Closed}. *) method count : int (** Number of elements in the stream queue. *) @@ -161,13 +161,13 @@ val clone : 'a t -> 'a t For example: {[ - # let st1 = Lwt_stream.of_list [1; 2; 3];; - val st1 : int Lwt_stream.t = - # let st2 = Lwt_stream.clone st1;; - val st2 : int Lwt_stream.t = - # lwt x = Lwt_stream.next st1;; + # let st1 = Eliom_stream.of_list [1; 2; 3];; + val st1 : int Eliom_stream.t = + # let st2 = Eliom_stream.clone st1;; + val st2 : int Eliom_stream.t = + # lwt x = Eliom_stream.next st1;; val x : int = 1 - # lwt y = Lwt_stream.next st2;; + # lwt y = Eliom_stream.next st2;; val y : int = 1 ]} @@ -276,13 +276,13 @@ val junk_old : 'a t -> unit Lwt.t For example: {[ - # let st1 = Lwt_stream.of_list [1; 2; 3];; - val st1 : int Lwt_stream.t = - # let st2 = Lwt_stream.map string_of_int st1;; - val st2 : string Lwt_stream.t = - # lwt x = Lwt_stream.next st1;; + # let st1 = Eliom_stream.of_list [1; 2; 3];; + val st1 : int Eliom_stream.t = + # let st2 = Eliom_stream.map string_of_int st1;; + val st2 : string Eliom_stream.t = + # lwt x = Eliom_stream.next st1;; val x : int = 1 - # lwt y = Lwt_stream.next st2;; + # lwt y = Eliom_stream.next st2;; val y : string = "2" ]} *) @@ -394,7 +394,7 @@ val hexdump : char t -> string t let () = Lwt_main.run begin Lwt_io.write_lines Lwt_io.stdout - (Lwt_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin)) + (Eliom_stream.hexdump (Lwt_io.read_lines Lwt_io.stdin)) end ]} *) diff --git a/src/lib/server/dune.server b/src/lib/server/dune.server index a0e827aa33..c1646c9f6f 100644 --- a/src/lib/server/dune.server +++ b/src/lib/server/dune.server @@ -1,3 +1,5 @@ +(rule (copy# ../eio_react.shared.ml eio_react.ml)) +(rule (copy# ../eio_react.shared.mli eio_react.mli)) (rule (copy# ../eliom.server.ml eliom.ml)) (rule (copy# ../eliom.server.mli eliom.mli)) (rule (copy# ../eliom_bus.server.ml eliom_bus.ml)) @@ -102,6 +104,8 @@ (rule (copy# ../eliom_shared_sigs.shared.mli eliom_shared_sigs.mli)) (rule (copy# ../eliom_state.server.ml eliom_state.ml)) (rule (copy# ../eliom_state.server.mli eliom_state.mli)) +(rule (copy# ../eliom_stream.shared.ml eliom_stream.ml)) +(rule (copy# ../eliom_stream.shared.mli eliom_stream.mli)) (rule (copy# ../eliom_syntax.server.ml eliom_syntax.ml)) (rule (copy# ../eliom_syntax.server.mli eliom_syntax.mli)) (rule (target eliom_tools.ml) (deps ../eliom_tools.eliom) From a65489fda07e02a6c170aed69eb3f85768c62579 Mon Sep 17 00:00:00 2001 From: Vincent Balat Date: Wed, 27 Aug 2025 16:27:04 +0200 Subject: [PATCH 05/21] Automatic changes done with ciao-lwt --- src/lib/client/eliommod_dom.ml | 95 ++- src/lib/client/eliommod_dom.mli | 2 +- src/lib/eio_react.shared.ml | 151 +++-- src/lib/eio_react.shared.mli | 111 ++-- src/lib/eliom_bus.client.ml | 102 +-- src/lib/eliom_bus.client.mli | 2 +- src/lib/eliom_bus.server.ml | 12 +- src/lib/eliom_bus.server.mli | 4 +- src/lib/eliom_client.client.ml | 699 +++++++++++---------- src/lib/eliom_client.client.mli | 28 +- src/lib/eliom_client.server.mli | 4 +- src/lib/eliom_client_core.client.ml | 33 +- src/lib/eliom_client_main.eliom | 12 +- src/lib/eliom_comet.client.ml | 215 ++++--- src/lib/eliom_comet.client.mli | 4 +- src/lib/eliom_comet.server.ml | 205 +++--- src/lib/eliom_comet.server.mli | 5 +- src/lib/eliom_common.server.ml | 74 +-- src/lib/eliom_common.server.mli | 21 +- src/lib/eliom_common_base.shared.ml | 6 +- src/lib/eliom_content.client.mli | 4 +- src/lib/eliom_cscache.eliom | 14 +- src/lib/eliom_cscache.eliomi | 6 +- src/lib/eliom_extension.server.ml | 6 +- src/lib/eliom_extension.server.mli | 4 +- src/lib/eliom_form.eliom | 45 +- src/lib/eliom_form.eliomi | 2 +- src/lib/eliom_form_sigs.shared.mli | 8 +- src/lib/eliom_mkreg.server.ml | 216 ++++--- src/lib/eliom_notif.server.ml | 39 +- src/lib/eliom_notif.server.mli | 8 +- src/lib/eliom_parameter_base.shared.ml | 22 +- src/lib/eliom_parameter_sigs.shared.mli | 8 +- src/lib/eliom_react.client.ml | 29 +- src/lib/eliom_react.client.mli | 4 +- src/lib/eliom_react.server.ml | 32 +- src/lib/eliom_reference.server.ml | 124 ++-- src/lib/eliom_reference.server.mli | 16 +- src/lib/eliom_registration.client.ml | 74 ++- src/lib/eliom_registration.client.mli | 4 +- src/lib/eliom_registration.server.ml | 171 +++-- src/lib/eliom_registration.server.mli | 12 +- src/lib/eliom_registration_sigs.shared.mli | 42 +- src/lib/eliom_request.client.ml | 208 +++--- src/lib/eliom_request.client.mli | 14 +- src/lib/eliom_request_info.client.ml | 18 +- src/lib/eliom_request_info.client.mli | 10 +- src/lib/eliom_request_info.server.ml | 6 +- src/lib/eliom_request_info.server.mli | 12 +- src/lib/eliom_route.client.ml | 7 +- src/lib/eliom_route.server.ml | 310 +++++---- src/lib/eliom_route.server.mli | 6 +- src/lib/eliom_route_base.shared.ml | 183 +++--- src/lib/eliom_service.client.ml | 2 +- src/lib/eliom_service.client.mli | 4 +- src/lib/eliom_service_base.eliom | 8 +- src/lib/eliom_service_sigs.shared.mli | 4 +- src/lib/eliom_shared.client.mli | 4 +- src/lib/eliom_shared.eliom | 250 ++++---- src/lib/eliom_shared.server.mli | 4 +- src/lib/eliom_shared_sigs.shared.mli | 32 +- src/lib/eliom_state.server.ml | 416 ++++++------ src/lib/eliom_state.server.mli | 99 ++- src/lib/eliom_stream.shared.ml | 141 +++-- src/lib/eliom_stream.shared.mli | 72 +-- src/lib/eliom_tools.eliom | 6 +- src/lib/eliom_tools.eliomi | 8 +- src/lib/server/eliommod.ml | 2 +- src/lib/server/eliommod.mli | 4 +- src/lib/server/eliommod_cookies.ml | 329 +++++----- src/lib/server/eliommod_cookies.mli | 20 +- src/lib/server/eliommod_gc.ml | 287 ++++----- src/lib/server/eliommod_pagegen.ml | 265 ++++---- src/lib/server/eliommod_pagegen.mli | 13 +- src/lib/server/eliommod_persess.ml | 155 +++-- src/lib/server/eliommod_persess.mli | 8 +- src/lib/server/eliommod_sessadmin.ml | 46 +- src/lib/server/eliommod_sessadmin.mli | 12 +- src/lib/server/eliommod_sessexpl.ml | 38 +- src/lib/server/eliommod_sessexpl.mli | 28 +- src/lib/server/eliommod_sessiongroups.ml | 261 ++++---- src/lib/server/eliommod_sessiongroups.mli | 16 +- src/lib/server/eliommod_timeouts.ml | 16 +- src/lib/server/monitor/eliom_monitor.ml | 124 ++-- src/lib/server/monitor/eliom_monitor.mli | 4 +- 85 files changed, 3089 insertions(+), 3038 deletions(-) diff --git a/src/lib/client/eliommod_dom.ml b/src/lib/client/eliommod_dom.ml index 54028c8fb9..14871c251e 100644 --- a/src/lib/client/eliommod_dom.ml +++ b/src/lib/client/eliommod_dom.ml @@ -1,4 +1,4 @@ -open Lwt.Syntax +open Eio.Std (* Ocsigen * http://www.ocsigen.org @@ -479,7 +479,7 @@ let fetch_linked_css e = let css = Eliom_request.http_get href [] Eliom_request.string_result in - acc @ [e, (e##.media, href, css >|= snd)] + acc @ [e, (e##.media, href, snd css)] | Dom.Element e -> let c = e##.childNodes in let acc = ref acc in @@ -578,26 +578,25 @@ let rewrite_css_url ~prefix css pos = let import_re = Regexp.regexp "@import\\s*" let rec rewrite_css ~max (media, href, css) = - Lwt.catch - (fun () -> - css >>= function - | None -> Lwt.return_nil - | Some css -> - if !Eliom_config.debug_timings - then Console.console##(time (Js.string ("rewrite_CSS: " ^ href))); - let* imports, css = - rewrite_css_import ~max ~prefix:(basedir href) ~media css 0 - in - if !Eliom_config.debug_timings - then Console.console##(timeEnd (Js.string ("rewrite_CSS: " ^ href))); - Lwt.return (imports @ [media, css])) - (fun _ -> Lwt.return [media, Printf.sprintf "@import url(%s);" href]) + try + match css with + | None -> [] + | Some css -> + if !Eliom_config.debug_timings + then Console.console##(time (Js.string ("rewrite_CSS: " ^ href))); + let imports, css = + rewrite_css_import ~max ~prefix:(basedir href) ~media css 0 + in + if !Eliom_config.debug_timings + then Console.console##(timeEnd (Js.string ("rewrite_CSS: " ^ href))); + imports @ [media, css] + with _ -> [media, Printf.sprintf "@import url(%s);" href] and rewrite_css_import ?(charset = "") ~max ~prefix ~media css pos = match Regexp.search import_re css pos with | None -> (* No @import anymore, rewrite url. *) - Lwt.return ([], rewrite_css_url ~prefix css pos) + [], rewrite_css_url ~prefix css pos | Some (i, res) -> ( (* Found @import rule, try to preload. *) let init = String.sub css pos (i - pos) in @@ -606,45 +605,46 @@ and rewrite_css_import ?(charset = "") ~max ~prefix ~media css pos = let i = i + String.length (Regexp.matched_string res) in let i, href = parse_url ~prefix css i in let i, media' = parse_media css i in - let* import = - if max = 0 - then - (* Maximum imbrication of @import reached, rewrite url. *) - Lwt.return - [media, Printf.sprintf "@import url('%s') %s;\n" href media'] - else if media##.length > 0 && String.length media' > 0 - then - (* TODO combine media if possible... + let (imports, css), import = + Fiber.pair + (fun () -> rewrite_css_import ~charset ~max ~prefix ~media css i) + (fun () -> + if + (* TODO: ciao-lwt: This computation might not be suspended correctly. *) + max = 0 + then + (* Maximum imbrication of @import reached, rewrite url. *) + [media, Printf.sprintf "@import url('%s') %s;\n" href media'] + else if media##.length > 0 && String.length media' > 0 + then + (* TODO combine media if possible... in the mean time keep explicit import. *) - Lwt.return - [media, Printf.sprintf "@import url('%s') %s;\n" href media'] - else - let media = - if media##.length > 0 then media else Js.string media' - in - let css = - Eliom_request.http_get href [] Eliom_request.string_result - in - rewrite_css ~max:(max - 1) (media, href, css >|= snd) - and* imports, css = - rewrite_css_import ~charset ~max ~prefix ~media css i + [media, Printf.sprintf "@import url('%s') %s;\n" href media'] + else + let media = + if media##.length > 0 then media else Js.string media' + in + let css = + Eliom_request.http_get href [] Eliom_request.string_result + in + rewrite_css ~max:(max - 1) (media, href, snd css)) in - Lwt.return (import @ imports, css) + import @ imports, css with - | Incorrect_url -> Lwt.return ([], rewrite_css_url ~prefix css pos) + | Incorrect_url -> [], rewrite_css_url ~prefix css pos | exn -> Logs.info ~src:section (fun fmt -> fmt ("Error while importing css" ^^ "@\n%s") (Printexc.to_string exn)); - Lwt.return ([], rewrite_css_url ~prefix css pos)) + [], rewrite_css_url ~prefix css pos) let max_preload_depth = ref 4 let build_style (e, css) = - let* css = rewrite_css ~max:!max_preload_depth css in - (* lwt css = *) - Lwt_list.map_p + let css = rewrite_css ~max:!max_preload_depth css in + Fiber.List.map + (* lwt css = *) (fun (media, css) -> let style = Dom_html.createStyle Dom_html.document in style##._type := Js.string "text/css"; @@ -655,7 +655,7 @@ let build_style (e, css) = if Js.Optdef.test styleSheet then Js.Unsafe.(set styleSheet (Js.string "cssText") (Js.string css)) else style##.innerHTML := Js.string css; - Lwt.return (e, (style :> Dom.node Js.t))) + e, (style :> Dom.node Js.t)) css (* IE8 doesn't allow appendChild on noscript-elements *) @@ -669,7 +669,7 @@ let build_style (e, css) = let preload_css (doc : Dom_html.element Js.t) = if !Eliom_config.debug_timings then Console.console##(time (Js.string "preload_css (fetch+rewrite)")); - let* css = Lwt_list.map_p build_style (fetch_linked_css (get_head doc)) in + let css = Fiber.List.map build_style (fetch_linked_css (get_head doc)) in let css = List.concat css in List.iter (fun (e, css) -> @@ -682,8 +682,7 @@ let preload_css (doc : Dom_html.element Js.t) = section (fun fmt -> fmt "Unique CSS skipped...")) css; if !Eliom_config.debug_timings - then Console.console##(timeEnd (Js.string "preload_css (fetch+rewrite)")); - Lwt.return_unit + then Console.console##(timeEnd (Js.string "preload_css (fetch+rewrite)")) (** Window scrolling *) diff --git a/src/lib/client/eliommod_dom.mli b/src/lib/client/eliommod_dom.mli index 5286bd8e50..70267dcd90 100644 --- a/src/lib/client/eliommod_dom.mli +++ b/src/lib/client/eliommod_dom.mli @@ -71,7 +71,7 @@ val html_document : (** Assuming [d] has a body and head element, [html_document d] will return the same document as html *) -val preload_css : Dom_html.element Js.t -> unit Lwt.t +val preload_css : Dom_html.element Js.t -> unit (** [preload_css e] downloads every css included in every link elements that is a descendant of [e] and replace it and its linked css by inline [