4

How to bufferize efficiently in nodeJS on events from a stream to bulk insert instead of unique insert per record received from the stream. Here's pseudo code I've got in mind:

// Open MongoDB connection

mystream.on('data', (record) => {
   // bufferize data into an array
   // if the buffer is full (1000 records)
   // bulk insert into MongoDB and empty buffer
})

mystream.on('end', () => {
   // close connection
})

Does this look realistic? Is there any possible optimization? Existing libraries facilitaties that?

4
  • 2
    The nodejs native stream api sounds like the perfect fit, you should look into using a Writable. The size of the buffer may be controlled by setting the highWaterMark. The writable class has a final() function, which is called once the stream has completed. This could be used to close the db connection. Commented Nov 9, 2020 at 5:02
  • Thanks for your answer, I considered that option too, that's probably the best way to solve that issue, the more data you get in, the more data to put on the buffer, and the data you receive in the buffer will be automatically populating the MongoDB database, I assume this way you can also control the data flow, and get the data coming into the input automatically destroyed. I'm planning to use small to quite large datasets with this approach (few kb to 5-10gb of streamed data) Commented Nov 9, 2020 at 5:23
  • 1
    Both MongoDB's native driver (and the Mongoose API) expose a DB cursor interface that can be encapsulated as a stream.Readable (stream.Readable.from()) and then piped into the buffer Writable. Thus, the script won't fetch more data than it can store in its writable buffer. Commented Nov 9, 2020 at 12:01
  • This example is very close to what I'm looking for github.com/sorribas/mongo-write-stream/blob/master/index.js Commented Nov 9, 2020 at 12:05

2 Answers 2

5

Using NodeJS' stream library, this can be concisely and efficiently implemented as:

const stream = require('stream');
const util = require('util');
const mongo = require('mongo');

const streamSource; // A stream of objects from somewhere

// Establish DB connection
const client = new mongo.MongoClient("uri");
await client.connect();

// The specific collection to store our documents
const collection = client.db("my_db").collection("my_collection");

await util.promisify(stream.pipeline)( 
  streamSource, 
  stream.Writable({
    objectMode: true,
    highWaterMark: 1000,
    writev: async (chunks, next) => {
      try {
        const documents = chunks.map(({chunk}) => chunk);
        
        await collection.insertMany(docs, {ordered: false});

        next();
      }
      catch( error ){
        next( error );
      }
    }
  })
);
Sign up to request clarification or add additional context in comments.

4 Comments

again, thank you so much for your help, it might be a drop in solution for my problem.
I'm relatively new to Nodejs Streams (in a more complex use-case). This answer has helped me A LOT! Thanks heaps.
It's important to note that this method doesn't guarantee that writev always gets 1000 chunks. It can be anything between 1 and 1000.
Yes, at most 1000 chunks/documents. Depending on how many objects have been added to the write buffer between the previous call to writev and calling next.
0

I ended up with a no dependency solution.

const { MongoClient } = require("mongodb")
const url = process.env.MONGO_URI || "mongodb://localhost:27019";
const connection = MongoClient.connect(url, { useNewUrlParser: true, useUnifiedTopology: true })
    Promise.resolve(connection)
        .then((db) => {
            const dbName = "databaseName";
            const collection = 'collection';
            const dbo = db.db(dbName);

            let buffer = []

            stream.on("data", (row: any) => {
                buffer.push(row)
                if (buffer.length > 10000) {
                    dbo.collection(collection).insertMany(buffer, {ordered: false});
                    buffer = []
                }
            });

            stream.on("end", () => {
                // insert last chunk
                dbo.collection(collection).insertMany(buffer, {ordered: false})
                    .then(() => {
                        console.log("Done!");
                        db.close();
                    })
                
            });
            stream.on("error", (err) => console.log(err));

        })
        .catch((err) => {
            console.log(err)
        })

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.