0

I am attempting to read a stream line by line and for each line do some async processing. The issue I'm having is how to determine when the operations for all lines are complete. I thought I could use the readline "close" event but that seems to be triggered long before the async operations I've started on the "line" event complete.

Here's the event handlers in question:

logReader.on("line", inputLine => {
    self._processLogLine(inputLine, () => {
        if (self.streamFinished) {
            completionCallback(err);
        }
    });
});

logReader.on("close", () => {
    self.streamFinished = true;
});

The completionCallback method should be called when all line processing on the stream is done.

I've thought about adding counters for line operations started/completed and calling the completionCallback when they match but that seems awkward.

2
  • What is logReader? Commented Oct 27, 2017 at 18:21
  • logReader is the readline interface let logReader = readline.createInterface(self.inputStream); Commented Oct 30, 2017 at 14:31

3 Answers 3

1

Here is one way you could do it basically keeping track of each lines success. I don't have enough of your code to test it...

const lineProcessing = [];

const doCompletion = () => {
  if (self.streamFinished && !lineProcessing.includes(false)) {
    // Not sure where 'err' comes from
    completionCallback(err);
  }
}

logReader.on("line", inputLine => {
  lineProcessing.push(false);
  let processingIndex = lineProcessing.length - 1;
  self._processLogLine(inputLine, () => {
    lineProcessing[processingIndex] = true;
    doCompletion();
  });
});

logReader.on("close", () => {
  self.streamFinished = true;
});
Sign up to request clarification or add additional context in comments.

1 Comment

This is similar to what I wound up doing which was to use 2 counters linesStarted/linesCompleted and having my completion test be self.streamFinished && self.linesCompleted === self.linesStarted
1

It's a little difficult to tell for sure what needs to happen without more context. But here's a quick and somewhat dirty fix that should be adaptable to your situation. It shows how you can accomplish this with promises:

let promises = []

logReader.on("line", inputLine => {
    promises.push(new Promise(resolve => {
        self._processLogLine(inputLine, resolve)
    })
})

logReader.on("close", () => {
    Promise.all(promises)
        .then(completionCallback)
})

Essentially, just create an array of promises. When you know that all promises have been added to the array, call Promise.all() on it.

If you use this approach, you'd probably want to add error checking (rejecting and catching).

1 Comment

This is a nice solution but I hadn't been using Promise's and not sure how much is involved in converting the code to do so.
0

Look into generators. They give you a way to do a "for" loop...and when you're done, you just return from the generator function.

So you can have a generator to read the file, and a function that uses the generator, then when the loop ends, call your callback.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.