0

I'm trying to implement a ReactiveX-like (e.g. somewhat similar to RxJava/RxJS and friends) graph engine in Rust, but geared towards synchronous computation only, with focus on intermediate value caching. It's kind of like standard library iterators but backwards.

I got to implement a toy prototype of Observable/Observer pattern: Playground -- and although it's not quite what I want in the end it demonstrates the general spirit of things.

Basically, I'd want code that looks like this (pseudocode):

let mut input = Input();
let mut x = input.map(|i| i * 2);
// unavoidable multiple mut borrow?..
let mut y = x.fork().map(|x| x + 3) // [must_use]
y.subscribe(|y| println!("y={}", y)); // moved
let mut z = x.fork().map(|x| x - 4).do_more_stuff(); // [must_use]
z.subscribe(|z| println!("z={}", z)); // moved
// some time later:
input.feed(42);

to be unrolled into roughly this

|i| {
    let x = i * 2;
    let y = x + 3;
    println!("y={}", y);
    let z = do_more_stuff(x - 4);
    println!("z={}", z);
}

The syntax is important here, I know how to do it "backwards" or modify it significantly so it kind of works, but I'd like to keep the "top-down" syntax if possible for readability sake, the 'forks' in particular.

So you can think of it as a directed graph, with a single input at the top, and each node doing some computation and passing the result (possibly multiple times; or maybe not passing it) to the bottom levels, potentially "forking" so that the value can be passed to multiple lower-level nodes without recomputing. At the very bottom, there are "subscribers/observers" who can listen to values and do anything with them. Once created, this graph (i.e., the graph structure) is completely immutable.

In implementations of many Rx frameworks, it's possible to do this without explicitly carrying the value around as a struct field, due to the fact that it will live in the parent closure.

I've spent considerable amount of time thinking and prototyping in Rust, but can't seem to quite figure a nice solution out that would be fast and 'nice', i.e. ergonomic, as in the samples above. For instance, with the fork() (which splits the stream), the desired syntax kind of immediately implies multiple mutable borrows to the parent, even if the child later gets moved?

The core ideas (that are implemented in the playground linked above) that I think would work are: the Observer concept - basically, a trait requiring implementor to implement an on_next(&mut self, value: T), so that any FnMut(T) -> () can automatically implement this. Then, a "fork" is something that contains a list of Box<Observer> so that it can call them. Perhaps a "stream/observable" that can accept an observer that would subscribe to it (in a mutable fashion?).

Anyways, maybe I'm hitting my head against a well-known wall here or maybe there are better solutions to this (or perhaps this has already been implemented somewhere) -- any thoughts would be much appreciated.

3
  • Would it be acceptable if fork() consumes itself and returns two independent objects instead? Commented Oct 24, 2018 at 22:55
  • @orlp Yep, definitely. Commented Oct 25, 2018 at 11:34
  • @orlp I think managed to solve it... it's a bit of pain though :) (see the answer) Commented Oct 26, 2018 at 1:15

1 Answer 1

0

It's possible. Basically, this works:

let input = Input::<i64>::new();

let s1 = input.fork();
s1.subscribe(|x| {
    println!("s1: {}", x)
});

let s2 = input.fork();
s2.map(|x| x * 3).subscribe(|x| {
    println!("s2: {}", x)
});

let s3 = input.fork().map(|x| {
    let s3 = (x as f64) + 100.5;
    println!("s3: {}", s3);
    s3
}).share();

s3.fork().map(|x| x + 1.).subscribe(|x| println!("s4: {}", x));

s3.fork().map(|x| x + 2.).subscribe(|x| println!("s5: {}", x));

input.feed(1);
println!("---");
input.feed(2);

and outputs

s1: 1
s2: 3
s3: 101.5
s4: 102.5
s5: 103.5
---
s1: 2
s2: 6
s3: 102.5
s4: 103.5
s5: 104.5

Implementation-wise: Rc<RefCell<Vec<Box<Trait>>>> and lots of lifetime markers...

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.