5

I'm reading a data stream from a CSV file row by row, and calling a findOne MongoDB call on each row, how can I wait till all the mongo calls from each row are complete before I run the next function?

I've seen Promises can do it? But I find Promises extremely difficult to understand. And none of the examples I've found seem to cover what I'm trying to it. :/

var validProducts = [];

fs.createReadStream(req.file.path)
  .pipe(csvStream)
  .on('error', function (err) {
    console.error(err);
  })
  // loop through all rows
  .on('data', function (data) {
    if (data.size === 'a3') {
      ProductModel.findOne({ sku: data.sku }, function (err, product) {
        if (product !== null) {
          product.size = data.size;
          product.save();
          validProducts.push(product);
        }
      });
    }
  });

// on finish make call to other function
socket.emit({ 'status': 'complete' });
otherFunction(validProducts);

on('finish') or on('end') will only call at the end of the data stream, not after the Monogo calls.

If I can use promises, can someone please explain how?

0

2 Answers 2

4

You could use the Q API witch allows you to make promises. There is an interesting function that allows you to wait for an array of promises to be resolved. Here is an example of how you could solve your problem with Q.all:

var validProducts = [];
var promises = [];

function handleData(data) {
    if (data.size === 'a3') {

        var deferred = Q.defer();

        ProductModel.findOne({ sku: data.sku }, function (err, product) {
            if (err) {
                deferred.reject(new Error(err));
            }

            if (product) {
                product.size = data.size;
                product.save();
                deferred.resolve(product);
                validProducts.push(product);
            }

        });

        promises.push(deferred.promise);

    }
}

function handleEnd() {
    Q.all(promises).done(function (values) {
        socket.emit({ 'status': 'complete' });
        otherFunction(validProducts);
    });
}

fs.createReadStream(req.file.path)
  .on('data', handleData)
  .on('end', handleEnd);
Sign up to request clarification or add additional context in comments.

6 Comments

:P I was so close to this on my own,. but didn't setup the deferred stuff right.
this wont wait till all the mongo calls from each row are complete before I run the next function at all
Maybe my logic is flawed? Correct me if i'm wrong but when there is no more data to read from the file, we are waiting for all the promises to be resolved before executing the specified functions.
If I'm right in my way of thinking, the stream will end before anything has been added to the promises array. So Q.all(promises) will execute instantly. :/
Actually there was a typo initially! I edited it and I thought it would go unnoticed. It was a pleasure ;)
|
3

Use pause/resume

  .on('data', function (data) {
    if (data.size === 'a3') {
      this.pause(); // pause it
      var stream = this; // save 'this'
      ProductModel.findOne({ sku: data.sku }, function (err, product) {
        if (product !== null) {
          product.size = data.size;
          product.save();
          validProducts.push(product);
        }
        stream.resume(); //resume it
      });
    }
  });

11 Comments

doesn't this make the function synchronous, not asynchronous?
Close. But the .resume() and .push() need to happen inside the callback from .save(). Also not sure that this is exposed, so you likely need to grab the stream externally first.
It's an I/O function for a database. Just about all I/O in node is async, or should be.
I actually think I have no clue what the question is :D - I thought each row was being processed by the on('data' function
the I/O is async, but only one mongo call will happen at once if the stream is paused until the previous one finishes, which makes it Synchronous (technically), while also (technically) answer my question..ish.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.