Friday, July 31, 2015

FRP without Space Leaks

The dataflow engine I gave in my last post can be seen as an implementation of self-adjusting computation, in the style of Acar, Blelloch and Harper's original POPL 2002 paper Adaptive Functional Programming. (Since then, state of the art implementation techniques have improved a lot, so don't take my post as indicative of what modern libraries do.)

Many people have seen resemblances between self-adjusting computation and functional reactive programming --- a good example of this is Jake Donham's Froc library for Ocaml. Originally, I was one of those people, but that's no longer true: I think SAC and FRP are completely orthogonal.

I now think that FRP libraries can be very minimalistic --- my ICFP 2013 paper Higher-Order Reactive Programming without Spacetime Leaks gives a type system, implementation, and correctness proof for an FRP language with full support for higher order constructions like higher-order functions and streams of streams, while at the same time statically ruling out space and time leaks.

The key idea is to distinguish between stable values (like ints and bools) whose representation doesn't change over time from dynamic values (like streams) whose representation is time-varying. Stable values are the usual datatypes, and can be used whenever we like. But dynamic values have a scheduling constraint: we can only use them at certain times. For example, with a stream, we want to look at the head at time 0, the head of the tail at time 1, the head of the tail of the tail at time 2, and so on. It's a mistake to look at the head of the tail of a stream at time 0, because that value might not be available yet.

With an appropriate type discipline, it's possible to ensure scheduling correctness statically, but unfortunately many people are put off by modal types and Kripke logical relations. This is a shame, because the payoff of all this is that the implementation strategy is super-simple -- we can just use plain-vanilla lazy evaluation to implement FRP.

Recently, though, I've figured out how to embed this kind of FRP library into standard functional languages like Ocaml. Since we can't define modal type operators in standard functional languages, we have to give up some static assurance, and replace the static checks of time-correctness with dynamic checks, but we are still able to rule out space leaks by construction, and still get a runtime error if we mis-schedule a program. Essentially, we can replace type checking with contract checking. As usual, you can find the code on Github here.

Let's look at an Ocaml signature for this library:

module type NEXT =
sig
  type 'a t
  exception Timing_error of int * int 

  val delay  : (unit -> 'a) -> 'a t
  val map    : ('a -> 'b) -> 'a t -> 'b t
  val zip    : 'a t * 'b t -> ('a * 'b) t
  val unzip  : ('a * 'b) t -> 'a t * 'b t
  val ($)    : ('a -> 'b) t -> 'a t -> 'b t   (* This op is redundant but convenient *)
  val fix    : ('a t -> 'a) -> 'a

  (* Use these operations to implement an event loop *)

  module Runtime : sig
    val tick : unit -> unit
    val force : 'a t -> 'a
  end
end

The NEXT signature introduces a single type constructor 'a t, which can be thought of as the type of computations which are scheduled to be evaluated on precisely the next tick of the clock. The elements of 'a t are dynamic in the sense that I mention above: we are only permitted to evaluate it on the next tick of the clock, and evaluating it at any other time is an error.

To model this kind of error, we also have a Timing_error exception, which signals an error whose first argument contains the time a thunk was scheduled to be evaluated, and whose second argument contains the actual time.

Elements of 'a t are the only primitive way to create dynamic values -- other values (like function closures) can be dynamic, but only if they end up capturing a next-step thunk.

The delay function lets us create a next value from a thunk, and the map function maps a function over a thunk. The zip and unzip are used for pairing, and the $ operation is the McBride/Paterson idiomatic application operator. (Technically, it's derivable from zip, but it's easiest to throw it in to the basic API.)

The fix operation is the one that really makes reactive programming possible -- it says that guarded recursion is allowed. So if we have a function which takes an 'a next and returns an 'a, then we can take a fixed point. This fixed point will always never block the event loop, because its type ensures that we always delay by a tick before making a recursive call.

This raw interface is, honestly, not so useful as is, but the slightly miraculous fact is that this is the complete API we need to build all the higher-level abstractions --- like events and streams --- that we need to do real reactive programming.

Now, let's see what an implementation of this library could look like.

module Next : NEXT = struct
  let time = ref 0

We can keep track of the current time in a reference cell.

  type 'a t = {
    time : int;
    mutable code : 'a Lazy.t
  }

The type of a thunk is a record consisting of a lazy thunk, and the time when it is safe to force it.
  type s = Hide : 'a t -> s 
  let thunks : s list ref = ref []

We also have a list that stores all of the references that we've allocated. We'll use this list to enforce space-safety, by mutating any thunk that gets too old.

  exception Timing_error of int * int 

  let delay t =
    let t = { time = 1 + !time; code = Lazy.from_fun t} in
    thunks := (Hide t) :: !thunks;
    t

When we create a thunk with the delay function, we are creating a thunk to be forced on the next time tick. So we can dererefence time in order to find out the current time, and add 1 to get the scheduled execution time for the thunk. We also add it to the list thunks, so that we can remember that we created it.
  

  let force t =
     if t.time != !time then
       raise (Timing_error(t.time, !time))
     else
       Lazy.force t.code

Forcing a thunk just forces the code thunk, if the current time matches the scheduled time for the thunk. Otherwise, we raise a Timing_error. Note that memoization is handled by Ocaml's built-in 'a Lazy.t type.

   let map f r = delay (fun () -> f (force r))
   let zip (r, r') = delay (fun () -> (force r, force r'))
   let unzip r = (map fst r, map snd r)
   let ($) f x = delay (fun () -> force f (force x))
 
The map, zip, unzip, and ($) operators just force and delay things in the obvious places.
 
   let rec fix f = f (delay (fun () -> fix f))
 
The fixed point operation looks exactly like the standard lazy fixed point.
 
   module Runtime = struct
     let force = force
 
The runtime exposes the force function to the event loop.
 
     let cleanup (Hide t) =
       let b = t.time < !time in
       (if b then t.code <- lazy (raise (assert false)));
       b
 
     let tick () =
       time := !time + 1;
       thunks := List.filter cleanup !thunks
  end
end

The tick function advances time by doing two things. First, it increments the current time. Then, it filters the list of thunks using the cleanup function, which does two things. First, cleanup returns true if its argument is older than the current time. As a result, we only retain thunks in thunks which can be forced now or in in the future.

Second, if the argument thunk to cleanup is old, it replaces the code body with an assertion failure, since no time-correct program should ever force this thunk. Updating the code ensures that by construction next-step thunks always lose their reference to their data once they age out, because every thunk is placed onto thunks when it is created, and when the clock is ticked past its time, it is guaranteed to drop its references to its data.

This guarantees that spacetime leaks are impossible, since we dynamically zero out any thunks that get too old! So here we see how essential data abstraction is for imperative programming, and not just functional programming.

As you can see, the implementation of the Next library is pretty straightforward. The only mildly clever thing we do is to keep track of the next-tick computations so we can null them out when they get too old.

You should be wondering now how we can actually write reactive programs, when the primitive the API provides only lets you schedule a computation to run on the next tick, and that's it. The answer is datatype declarations. Now that we have a type that lets us talk about time, We can re-use our host language's facility to define types which say more interesting things about time.

Let's start with the classic datatype of functional reactive programming: streams. Streams are a kind of lazy sequence, which recursively give you a value now, and a stream starting tomorrow, thereby giving you a value on every time step.

module Stream :
  sig
    type 'a stream = Cons of 'a * 'a stream Next.t
    val head : 'a stream -> 'a
    val tail : 'a stream -> 'a stream Next.t
    val unfold : ('a -> 'b * 'a Next.t) -> 'a -> 'b stream
    val map : ('a -> 'b) -> 'a stream -> 'b stream
    val zip : 'a stream * 'b stream -> ('a * 'b) stream
    val unzip : ('a * 'b) stream -> 'a stream * 'b stream
  end = struct

We give a simple signature for streams above. They are a datatype exactly following the English description above, as well as a collection of accessor and constructor functions, like head, tail, map, unfold and so on. All of these have pretty much the expected types.

The only difference from the usual stream types is that sometimes we need a Next.t to tell us when a value needs to be available. Now let's look at the implementation.


  open Next

  type 'a stream = Cons of 'a * 'a stream Next.t


  let head (Cons(x, xs)) = x
  let tail (Cons(x, xs)) = xs
We can write accessor functions for streams, for convenience.

  let map f = fix (fun loop (Cons(x, xs)) -> Cons(f x, loop $ xs))

The map function uses the fix fixed point operator in our API, because we want to call the recursive function at a later time.
  
  let unfold f = fix (fun loop seed ->
      let (x, seed) = f seed in
      Cons(x, loop $ seed))

The unfold function uses a function f and an initial seed value to incrementally produce a sequence of values. This is exactly like the usual unfold, except we have to use the applicative interface to the 'a Next.t type to apply the function.
  

  let zip pair =
    unfold (fun (Cons(x, xs), Cons(y, ys)) -> ((x, y), Next.zip (xs, ys)))
           pair

  let unzip xys = fix (fun loop (Cons((x,y), xys')) ->
                         let (xs', ys') = Next.unzip (loop $ xys') in
                         (Cons(x, xs'), Cons(y, ys')))
                      xys
  end
zip and unzip work about the way we'd expect, in that we use Next.zip and Next.unzip to put together and take apart delayed pairs to build the ability to put together and take apart streams.

This is all very nice, but the real power of giving a reactive API based on a next-step type is that we can build types which aren't streams. For example, let's give a datatype of events, which is the type of values which will become available at some point in the future, but we don't know exactly when.

module Event :
  sig
    type 'a event = Now of 'a | Wait of 'a event Next.t
    val map : ('a -> 'b) -> 'a event -> 'b event
    val return : 'a -> 'a event
    val bind : 'a event -> ('a -> 'b event) -> 'b event
    val select : 'a event -> 'a event -> 'a event
  end
= struct
  open Next

  type 'a event = Now of 'a | Wait of 'a event Next.t

We represent this with a datatype 'a event, which has two constructors. We say that an 'a event is either a value of type 'a available Now, or we have to Wait to get another event tomorrow. So this is a single value of type 'a that could come at any time --- and we don't know when!

  let map f = fix (fun loop e ->
      match e with
      | Now x -> Now (f x)
      | Wait e' -> Wait (loop $ e'))

We can map over events, by waiting until the value becomes available and then applying a function to the result.

  let return x = Now x

  let bind m f =
    fix (fun bind m ->
        match m with 
        | Now v -> f v
        | Wait e' -> Wait (bind $ e'))
      m
Events also form a monad, which corresponds to the ability to sequence promises or futures in the promises libraries you'll find in Javascript or Scala. (The bind here is a bit like the code promise.then() method in JS.
  let select e1 e2 =
    fix (fun loop e1 e2 -> 
          match e1, e2 with 
          | Now a1, _ -> Now a1
          | _, Now a2 -> Now a2
          | Wait e1, Wait e2 -> Wait (loop $ e1 $ e2))
      e1
      e2
end

The really cool thing is that we can also join on two events to wait for the first one to complete! This can be extended to lists of events, if desired, but the pattern is easiest to see in the binary case.

Of course, here's a small example of how you can actually put this together to actually run a program. The run function gives an event loop that runs for k steps and halts, and prints out the first k elements of the stream it gets passed as an argument.

module Test =
struct
  open Next
  open Stream

  let ints n = unfold (fun i -> (i, delay(fun () -> i+1))) n 

  let rec run k xs =
    if k = 0
    then ()
    else
      let (x, xs) = (head xs, tail xs) in
      Printf.printf "%d\n" x;
      Runtime.tick();
      run (k-1) (Runtime.force xs)
end

10 comments:

  1. I'm confused. It's been a little while since I've written OCaml, but it looks like your Next.fix takes only a single argument of function type, but your Stream.map is calling Next.fix with a two argument function, ie. fix (fun loop (Cons(x, xs)) -> ...)

    What am I missing?

    ReplyDelete
    Replies
    1. Because of currying, a "two argument" function is really a one argument function returning another one argument function. So defining n-ary recursive functions works just fine.

      What is possible but annoying to do is to define mutually recursive functions, because then you need to take the fixed point of a whole tuple of functions at once. Semantically it makes perfect sense but the syntax is pretty ugly.

      Delete
    2. Right, I realized belatedly that it was probably the currying, which I'm just not used to seeing anymore.

      While implementing this in C#, it wasn't clear that Next.fix was actually needed for Stream and Event. Next.fix simply recurses indefinitely with a delay at each tick.

      Now consider Stream.Select from that link, which is .NET's map function. It invokes Next.Select (map), which itself already calls Next.delay, and the subsequent Next constructed for that Stream element simply calls Stream.Next recursively to continue this process indefinitely. That would seem to replicate the semantics of Next.fix without all the intermediate closures.

      Is that sufficient, or am I missing something obvious here too?

      Delete
    3. Yes, that works fine with the current implementation. I'm personally leery of doing that because in the denotational semantics, the existence of fixed points is due to a property of the next-step type. As a result, it's possible in principle to give an implementation where the native fixed point is not the one you want, and so this leaks information about the implementation strategy. (However, it seems unlikely that you'd ever want such an implementation, so you're almost certainly okay -- it just rubs my semanticist intuition the wrong way.)

      Delete
    4. C# is already pretty leaky, so that seems like the best choice here. Finally added join with explicit forcing (I think it's correct), and the simple console test works. Very nice!

      Delete
  2. This comment has been removed by a blog administrator.

    ReplyDelete
  3. A question...

    It looks like this implementation is geared towards applications which are tightly synchronized, in that all streams change at the same rate. If you have an event source which changes rarely (e.g. a configuration file) then it's rate needs to be increased to the rate of the fastest event source.

    Does this seem correct to you? There are advantages to synchronized implementations (notably a lot less buffering) but also costs (lots of "my value hasn't changed" events).

    ReplyDelete
  4. Hi Alan, yes, you're right. Type-theoretically, the whole setup is basically the modal mu-calculus with a next-step constructor. This means that there is a single global notion of time which everything is synchronized with respect to. You could call this style of FRP "higher-order synchronous dataflow", if you like.

    ReplyDelete
    Replies
    1. Unfortunately, most people think synchronous means as in I/O, hence calling it synchronized FRP. The naming of APIs is a difficult matter.

      Delete
  5. The Oz language has "Declarative variables" which can be bound only once, and act as a sort of first-class futures.
    Streams are implemented as a pair of "current value" and "next value" variables, in a way that resembles the NEXT treatment here.

    ReplyDelete