2

How can I process a series of messages that come through to a function, in the order that they come, when the functions to process the messages are operating on the messages asynchronously?

An example:

var processMessage = function(msg) {
  switch (msg.action) {
    case "doFoo":
      this.processFoo(()=> {
        // `foo` is done processing
      });
    break;
    case "doBar":
      this.processBar(()=> {
        // `bar` is done processing
      });
    break;
    case "doBaz":
      this.processBaz(()=> {
        // `baz` is done processing
      });
     break;
  }
}

Notes

  • I can certainly push the items in an array and then use async eachSeries to process the array of messages
  • However, messages come constantly, thus filling the array with more items to be processed causing the processing to falter

Is there any de-facto/standard solution to this kind of problem?

8
  • Sounds like streams, something like Rx might help. Commented Aug 19, 2016 at 16:53
  • @elclanrs Ha, I've thought it was a common/usual problem and my brain's just playing dumb - lack of caffeine Commented Aug 19, 2016 at 16:54
  • Just recursively pull a message from the queue, process it, if any messages still in the queue, call it again. The code should know that the queue may change while waiting for an async response which is OK if you write the code appropriately. Commented Aug 19, 2016 at 16:55
  • @jfriend but when do I start processing the queue? - Everytime a message arrives? Commented Aug 19, 2016 at 16:56
  • 1
    In Rx it would be something like stream.flatMap(processMessage) where processMessage returns a promise. Commented Aug 19, 2016 at 17:06

1 Answer 1

2

Here's a general scheme:

  1. When new message arrives, check a flag to see if you are already in the middle of processing a message. If the flag is set, just add the message to the queue.
  2. If the flag is not set, check the queue and if there is a message in the queue then remove that message from the queue.
  3. When you start processing that message, set a flag that indicates you are now in the middle of processing a message.
  4. Start the async operation that processes the message
  5. When the callback occurs that signals the completion of the async message, clear the flag to indicate you're in the middle of processing and recursively call a function that starts at step 2 again.

You trigger processing of new messages at two points. First, when a new messages arrives and you aren't already processing a message and second when the processing of some other message completes, you check if anything else was added to the queue while you were processing.

You maintain an inProcessing type flag so you don't inadvertently start processing an incoming message when another message is already in the middle of being processed (this forces the serial execution you requested).

You have to rigorously deal with error conditions so the flag never gets stuck and so your queue processing never gets stalled.

In pseudo code (assuming these are methods on a queue object which contains an array as the queue):

addQueue: function(msg) {
    if (!this.inProcess) {
        // not currently processing anything so just process the message
        this.processMessage(msg);
    } else {
        this.queue.push(msg);
    }
},


processMessage: function(msg, completeFn) {
     var self = this;
     // must set this flag before going async
     self.inProcess = true;
     // asynchronously process this message
     someAsyncProcessing(msg, function(err) {
         self.inProcess = false;
         if (completeFn) {
             completeFn(err);
         }
         // see if anything else is in the queue to process
         if (self.queue.length) {
             // pull out oldest message and process it
             var msg = self.queue.shift();
             self.processMessage(msg);
         }
     });
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.