0

I have a data stream (via Node EventEmitter) emitting data in JSON format and would like to save the stream into Cassandra as it gets emitted. Is there an elegant way to implement this functionality?

The driver that i'm using is nodejs-dse-driver and the Cassandra version is 3.11.1. Please suggest if there are any recommended plugins that i can leverage to accomplish the above task.

2 Answers 2

1

This is a good use case for a Transform Stream.

If you have a true Readable stream then you can pipe any Transform stream into the Readable stream. I don't think an event emitter is a readable stream though, so you may need to change your original data fetching implementation.

See the NodeJS documentation for implementation details. https://nodejs.org/api/stream.html#stream_new_stream_transform_options

Something like this depending on your version of NodeJS.

const myTransformStream = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    // insert into Cassandra code here
    cassandra.execute(query, row, {prepare: true}, () => {
        // after the execute is done, callback to process more 
        callback(null, row);
    });
  }
});

originalStream.pipe(myTransformStream);
Sign up to request clarification or add additional context in comments.

Comments

0

You can read the data in chunks from your source and send it in parallel, for example (using the async library):

const limit = 10;
stream.on('readable', () => {
  let r;
  const rows = [];
  async.whilst(function condition() {
    while ((r = csv.read()) != null && rows.length < limit) {
      rows.push(r);
    }
    return rows.length > 0;
  }, function eachGroup(next) {
    // we have a group of 10 rows or less to save
    // we can do it in a batch
    // or we can do it in parallel with async.each()
    async.each(rows, (r, eachCallback) {
      // Adapt the row to parameters
      // For example: sample
      const params = r.split(',);
      client.execute(query, params, { prepare: true}, eachCallback);
    }, next);
  }, function groupFinished(err) {
    if (err) {
      // something happened when saving
      // TODO: do something with err
      return;
    }
    // This chunk of rows emitted by stream where saved
  });
}).on('end', () => {
  // no more data from source
});

2 Comments

thank you for the example above. I'm not using async right now but inside the read stream (.on('data')) function i perform the Cassandra Insert via the client.execute function. This setup works perfectly fine with smaller datasets whereas i end up in the below error if the dataset is large, i tried inserting 5000+ rows and the timeout error pops up, Error: All host(s) tried for query failed. First host tried, <myhost>:9042: OperationTimedOutError: The host <myhost>:9042 did not reply before timeout 12000 ms. See innerErrors.
When a simple query is executed to get the count for the table where the insertion operation occurred i end up getting the same error. The Cassandra cluster that i have is pretty small and just has 1 replica, is this happening because of the cluster size?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.