I'm trying to implement a ReactiveX-like (e.g. somewhat similar to RxJava/RxJS and friends) graph engine in Rust, but geared towards synchronous computation only, with focus on intermediate value caching. It's kind of like standard library iterators but backwards.
I got to implement a toy prototype of Observable/Observer pattern: Playground -- and although it's not quite what I want in the end it demonstrates the general spirit of things.
Basically, I'd want code that looks like this (pseudocode):
let mut input = Input();
let mut x = input.map(|i| i * 2);
// unavoidable multiple mut borrow?..
let mut y = x.fork().map(|x| x + 3) // [must_use]
y.subscribe(|y| println!("y={}", y)); // moved
let mut z = x.fork().map(|x| x - 4).do_more_stuff(); // [must_use]
z.subscribe(|z| println!("z={}", z)); // moved
// some time later:
input.feed(42);
to be unrolled into roughly this
|i| {
let x = i * 2;
let y = x + 3;
println!("y={}", y);
let z = do_more_stuff(x - 4);
println!("z={}", z);
}
The syntax is important here, I know how to do it "backwards" or modify it significantly so it kind of works, but I'd like to keep the "top-down" syntax if possible for readability sake, the 'forks' in particular.
So you can think of it as a directed graph, with a single input at the top, and each node doing some computation and passing the result (possibly multiple times; or maybe not passing it) to the bottom levels, potentially "forking" so that the value can be passed to multiple lower-level nodes without recomputing. At the very bottom, there are "subscribers/observers" who can listen to values and do anything with them. Once created, this graph (i.e., the graph structure) is completely immutable.
In implementations of many Rx frameworks, it's possible to do this without explicitly carrying the value around as a struct field, due to the fact that it will live in the parent closure.
I've spent considerable amount of time thinking and prototyping in Rust, but can't seem to quite figure a nice solution out that would be fast and 'nice', i.e. ergonomic, as in the samples above. For instance, with the fork() (which splits the stream), the desired syntax kind of immediately implies multiple mutable borrows to the parent, even if the child later gets moved?
The core ideas (that are implemented in the playground linked above) that I think would work are: the Observer concept - basically, a trait requiring implementor to implement an on_next(&mut self, value: T), so that any FnMut(T) -> () can automatically implement this. Then, a "fork" is something that contains a list of Box<Observer> so that it can call them. Perhaps a "stream/observable" that can accept an observer that would subscribe to it (in a mutable fashion?).
Anyways, maybe I'm hitting my head against a well-known wall here or maybe there are better solutions to this (or perhaps this has already been implemented somewhere) -- any thoughts would be much appreciated.
fork()consumes itself and returns two independent objects instead?