How to define cycles with observables
Asked Answered
H

3

6

I'm trying to set up the update loop of a simple game, built with observables in mind. The top-level components are a model, which takes input commands, and produces updates; and a view, which displays the received updates, and produces input. In isolation, both work fine, the problematic part is putting the two together, since both depend on the other.

With the components being simplified to the following:

var view = function (updates) {
  return Rx.Observable.fromArray([1,2,3]);
};
var model = function (inputs) {
  return inputs.map(function (i) { return i * 10; });
};

The way I've hooked things together is this:

var inputBuffer = new Rx.Subject();
var updates = model(inputBuffer);
var inputs = view(updates);
updates.subscribe(
    function (i) { console.log(i); },
    function (e) { console.log("Error: " + e); },
    function () { console.log("Completed"); }
);
inputs.subscribe(inputBuffer);

That is, I add a subject as a placeholder for the input stream, and attach the model to that. Then, after the view is constructed, I pass on the actual inputs to the placeholder subject, thus closing the loop.

I can't help but feel this is not the proper way to do things, however. Using a subject for this seems to be overkill. Is there a way to do the same thing with publish() or defer() or something along those lines?

UPDATE: Here's a less abstract example to illustrate what I'm having problems with. Below you see the code for a simple "game", where the player needs to click on a target to hit it. The target can either appear on the left or on the right, and whenever it is hit, it switches to the other side. Seems simple enough, but I still have the feeling I'm missing something...

//-- Helper methods and whatnot
// Variables to easily represent the two states of the target
var left = 'left';
var right = 'right';
// Transition from one side to the other
var flip = function (side) {
  if (side === left) {
    return right;
  } else {
    return left;
  }
};
// Creates a predicate used for hit testing in the view
var nearby = function (target, radius) {
  return function (position) {
    var min = target - radius;
    var max = target + radius;
    return position >= min && position <= max;
  };
};
// Same as Observable.prototype.scan, but it also yields the initial value immediately.
var initScan = function (values, init, updater) {
  var initValue = Rx.Observable.return(init);
  var restValues = values.scan(init, updater);
  return initValue.concat(restValues);
};

//-- Part 1: From input to state --
var process = function (inputs) {
  // Determine new state based on current state and input
  var update = function(current, input) {
    // Input value ignored here because there's only one possible state transition
    return flip(current);
  };
  return initScan(inputs, left, update);
};
//-- Part 2: From display to inputs --
var display = function (states) {
  // Simulate clicks from the user at various positions (only one dimension, for simplicity)
  var clicks = Rx.Observable.interval(800)
      .map(function (v) {return (v * 5) % 30; })
      .do(function (v) { console.log("Shooting at: " + v)})
      .publish();
  clicks.connect();

  // Display position of target depending on the model
  var targetPos = states.map(function (state) {
    return state === left ? 5 : 25;
  });
  // Determine which clicks are hits based on displayed position
  return targetPos.flatMapLatest(function (target) {
    return clicks
        .filter(nearby(target, 10))
        .map(function (pos) { return "HIT! (@ "+ pos +")"; })
        .do(console.log);
  });
};

//-- Part 3: Putting the loop together 
/**
 * Creates the following feedback loop:
 * - Commands are passed to the process function to generate updates.
 * - Updates are passed to the display function to generates further commands.
 * - (this closes the loop)
 */
var feedback = function (process, display) {
  var inputBuffer = new Rx.Subject(),
      updates = process(inputBuffer),
      inputs = display(updates);
  inputs.subscribe(inputBuffer);
};
feedback(process, display);
Hardboard answered 8/6, 2014 at 15:19 Comment(5)
Have you had a look at the Rx schedulers that are available? In Windows the EventLoopScheduler would be a good fit. I don't know if there is an RxJS equivalent. Nevertheless, the schedulers offer very nice interface for creating update loops.Pyrex
I'm not all that familiar with the scheduler-related part of Rx, but I don't think that's relevant here. Those are responsible for when and how events get posted, but here my problem is declaratively defining what those events are in the first place.Hardboard
I think the schedulers would be worth your while looking at. There are, in Windows at least, some very nice recursively defined overloads for doing precisely the kind of thing you're trying here. I'll see if I can give an example later when I'm sitting at my desk.Pyrex
This question is old, but you could still have a look at cycle.js.org, it is dealing with precisely the same issues (and a few more). It is quite new and being extended, but for games with a static dependency graph, it should help a lot.Reggiereggis
I have since found cycle.js as well (talked briefly with its creator, too), it had some pretty interesting ideas. Doesn't do everything quite the way I would've, but it's pretty damn close! It's a good recommendation regardless.Hardboard
L
3

I think I understand what you are trying to achieve here:

  • How can I get a sequence of input events going in one direction that feed into a model
  • But have a sequence of output events going in the other direction that feed from the model to the view

I believe the answer here is that you probably want to flip your design. Assuming an MVVM style design, instead of having the Model know about the input sequence, it becomes agnostic. This means that you now have a model that has a InputRecieved/OnInput/ExecuteCommand method that the View will call with the input values. This should now be a lot easier for you to deal with a "Commands in one direction" and "Events in the other direction" pattern. A sort of tip-of-the-hat to CQRS here.

We use that style extensively on Views+Models in WPF/Silverlight/JS for the last 4 years.

Maybe something like this;

var model = function()
{
    var self = this;
    self.output = //Create observable sequence here

    self.filter = function(input) {
        //peform some command with input here
    };
}

var viewModel = function (model) {
    var self = this;
    self.filterText = ko.observable('');
    self.items = ko.observableArray();
    self.filterText.subscribe(function(newFilterText) {
        model.filter(newFilterText);
    });
    model.output.subscribe(item=>items.push(item));
};

update

Thanks for posting a full sample. It looks good. I like your new initScan operator, seems an obvious omission from Rx.

I took your code an restructured it the way I probably would have written it. I hope it help. The main things I did was encapsulted the logic into the model (flip, nearby etc) and have the view take the model as a parameter. Then I did also have to add some members to the model instead of it just being an observable sequence. This did however allow me to remove some extra logic from the view and put it in the model too (Hit logic)

//-- Helper methods and whatnot

// Same as Observable.prototype.scan, but it also yields the initial value immediately.
var initScan = function (values, init, updater) {
  var initValue = Rx.Observable.return(init);
  var restValues = values.scan(init, updater);
  return initValue.concat(restValues);
};

//-- Part 1: From input to state --
var process = function () {
  var self = this;
  var shots = new Rx.Subject();
  // Variables to easily represent the two states of the target
  var left = 'left';
  var right = 'right';
  // Transition from one side to the other
  var flip = function (side) {
    if (side === left) {
      return right;
    } else {
      return left;
    }
  };
  // Determine new state based on current state and input
  var update = function(current, input) {
    // Input value ignored here because there's only one possible state transition
    return flip(current);
  };
  // Creates a predicate used for hit testing in the view
  var isNearby = function (target, radius) {
    return function (position) {
      var min = target - radius;
      var max = target + radius;
      return position >= min && position <= max;
    };
  };

  self.shoot = function(input) { 
    shots.onNext(input); 
  };

  self.positions = initScan(shots, left, update).map(function (state) {
    return state === left ? 5 : 25;
  });

  self.hits = self.positions.flatMapLatest(function (target) {  
    return shots.filter(isNearby(target, 10));
  });
};
//-- Part 2: From display to inputs --
var display = function (model) {
  // Simulate clicks from the user at various positions (only one dimension, for simplicity)
  var clicks = Rx.Observable.interval(800)
      .map(function (v) {return (v * 5) % 30; })
      .do(function (v) { console.log("Shooting at: " + v)})
      .publish();
  clicks.connect();

  model.hits.subscribe(function(pos)=>{console.log("HIT! (@ "+ pos +")");});

  // Determine which clicks are hits based on displayed position
  model.positions(function (target) {
    return clicks
        .subscribe(pos=>{
          console.log("Shooting at " + pos + ")");
          model.shoot(pos)
        });
  });
};

//-- Part 3: Putting the loop together 
/**
 * Creates the following feedback loop:
 * - Commands are passed to the process function to generate updates.
 * - Updates are passed to the display function to generates further commands.
 * - (this closes the loop)
 */
var feedback = function (process, display) {
  var model = process();
  var view = display(model);
};
feedback(process, display);
Lackaday answered 23/6, 2014 at 13:58 Comment(7)
This seems to be on the right track, but as far as I understand, in this example you're modifying the items array in the view, relying on side effects to "close the loop", so to speak. I assume that in functional reactive programming, there's also a pure solution for this, somehow. I'm just not yet seeing what.Hardboard
Hmmm, I woudn't call anything that has a circular dependency pure ;-). Remember Observable Sequences are just an evolution of simple callbacks. Callbacks are a way for a layered design to allow a consumed API to callback into the consuming api. In your design, which is the consumer/observer and which is the consumed/observed?Lackaday
I have updated my answer to show how I might have done it. I appreciate that this is a different style to the original, but I hope you can take some value from it.Lackaday
I wouldn't say this kind of circular dependency is inherently impure. You can say that a graph has two nodes, A and B, and two edges, A->B and B->A. Why couldn't you do the same with functions or streams or whatever? Found some discussion on that here: github.com/leonidas/codeblog/blob/master/2012/… ...Unfortunately I'm barely familiar with Haskell, so it'll take some deciphering. Anyway, I appreciate your update, it does simplify some things. Still not quite what I'm looking for, but it'll do for now.Hardboard
Sorry, it was a tounge-in-cheek comment. However I would still prefer to have a single direction model. If you needed one to observe the other, then maybe some sort of bus in the middle is appropriate. But I am blowing the same tune now, and it seems you are looking for a specific hammer.Lackaday
Old topic however, a form of cycling is done in the Cycle.js framework.Bowling
yes introducing an EventLoop, Bus or other intermediary seems to be a better design. Cycle.js, ESP (github.com/esp/esp-js) and others out there provide this feature.Lackaday
T
1

I presume that because you do not "assign" the inputs after the model is created, you are aiming for a non-mutative approach to instantiating your model and view. However, your model and your view seem to depend on one another. To resolve this issue, you can use a third party to facilitate the relationship between the two objects. In this case, you can simply use a function for dependency injection...

var log           = console.log.bind(console),
    logError      = console.log.bind(console, 'Error:'),
    logCompleted  = console.log.bind(console, 'Completed.'),

model(
    function (updates) {
      return view(updates);
    }
  )
  .subscribe(
    log,
    logError,
    logCompleted
    );

By providing the model a factory to create a view, you give the model the ability to fully instantiate itself by instantiating it's view, but without knowing how the view is instantiated.

Thole answered 10/6, 2014 at 17:5 Comment(2)
You're correct, the lack of mutations is intentional, I'm trying to see how well a more-or-less functional approach works here. Functional reactive programming is a thing, so it's supposed to... maybe it's a language limitation I'm running into? Anyway, I think you're onto something. But as far as I can tell, this still doesn't solve the problem, it's just shifted the same circular dependency inside of the model. That still needs to create the view (for the inputs stream) before it can define the updates stream. ...wait, maybe I just need to use .defer() there, and that works?Hardboard
A synchronous circular dependency using defer is going to cause infinite recursion. Instead, you can either factor out the circular nature of the problem (a controller receives bubbling events, delegates events to the view for handling, merges in any new events from that view, filters out events it can handle, returns it back to the controller, the controller handles events, and bubbles any remaining. The model exists in and of itself... if it needs "updates" funneled through via an observable, that's something that needs to be worked out. That really sounds more like a data-binding issue.Thole
P
0

As per my comment on the question itself, here's the same sort of code you're writing done with a scheduler in Windows. I would expect a similar interface in RxJS.

var scheduler = new EventLoopScheduler();

var subscription = scheduler.Schedule(
    new int[] { 1, 2, 3 },
    TimeSpan.FromSeconds(1.0),
    (xs, a) => a(
        xs
            .Do(x => Console.WriteLine(x))
            .Select(x => x * 10)
            .ToArray(),
        TimeSpan.FromSeconds(1.0)));

The output I get, with three new numbers every second, is:

1
2
3
10
20
30
100
200
300
1000
2000
3000
10000
20000
30000
Pyrex answered 12/6, 2014 at 4:11 Comment(2)
I don't think the question is asking how to create an EventLoop, or a cycle of events. I think the question is how does he handle cyclic dependencies -or- ClassA observes ClassB, but ClassB observes ClassA.Lackaday
@LeeCampbell - I took it to mean that he's trying to create cyclic dependencies in order to create an event loop.Pyrex

© 2022 - 2024 — McMap. All rights reserved.