10

I had been trying to separate some work that's done in my program in a different thread. One of the functions needs to return a stream to the main thread but I'm having the following exception:

Error
    at MessagePort.<anonymous> ([worker eval]:12:16)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
From previous event:
    at PoolWorker.work (node_modules/node-worker-threads-pool/src/pool-worker.js:22:12)
    at DynamicPool.runTask (node_modules/node-worker-threads-pool/src/pool.js:110:47)
    at DynamicPool.exec (node_modules/node-worker-threads-pool/src/dynamic-pool.js:51:17)
    at renderToPdf (src/modules/templates/render2.js:27:14)
    at Context.<anonymous> (test/modules/templates/render.test.js:185:68)

I tried to construct a minimal example to reproduce what I'm trying to achieve. Basically, what I need is to send back a readable stream to the main thread. In this example, I'm also having a exception:

To have a pool of worker threads I'm using the library node-worker-threads-pool the DynamicPool specifically. And inside I'm trying to convert html to a PDF. But I need to somehow return the stream to the main thread.

const os = require('os');
const { DynamicPool } = require('node-worker-threads-pool');

const Pool = new DynamicPool(os.cpus().length);

async function convertToPDF(html) {
  return await Pool.exec({
    task: function() {
      const Promise = require('bluebird');
      const pdf = require('html-pdf');

      const { html } = this.workerData;

      const htmlToPdf = (html, renderOptions) => {
        const options = {
          format: 'Letter',
        };
        return pdf.create(html, Object.assign(options, renderOptions || {}));
      };

      return Promise.fromNode((cb) => htmlToPdf(html, {}).toStream(cb));
    },
    workerData: {
      html,
    },
  });
}

convertToPDF('<div>Hello World!</div>')
  .then((resp) => console.log('resp', resp))
  .catch((err) => console.error('err', err));
err DataCloneError: function() {
    if (this.autoClose) {
      this.destroy();
    }
  } could not be cloned.
    at MessagePort.<anonymous> ([worker eval]:12:16)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)

Do you have an idea of how can I achieve this?

PS: I'm aware that the IO operations are not as performant in the worker threads are they are in the nodejs main thread, but I need to do this to avoid locking the main thread with these operations.

3 Answers 3

5
+50

Short version: you can't.

IPC in node is handled through some black box, but what we know is that message objects are serialized before sending and deserialized once received: you can't serialize a Stream because it is based on underling level (a socket, a file descriptor, custom read and write functions, etc) which can't be serialized/deserialized.

So you are forced to exchange serializable data.

Taking a look at html-pdf I think an easy way to convert your program is to use pdf.toBuffer: rather than trying to send a Stream to main thread and reading it in main thread to obtain a Buffer, you should send a Buffer to main thread and than use it as is.

Hope this helps.

Sign up to request clarification or add additional context in comments.

Comments

3

Instead of trying to give the Stream to the main thread, why not just pipe it?

Create a MessageChannel shared between them.
The parent implements Readable, wrapping its MessagePort listening to .on('message').
The thread has an interface that implements Writable and basically passes any data from write() straight to .postMessage().

dont forget to add a little more implementation around write() for its return value. I'd always return false in the child thread's Writable, and forward all 'drain' events from the main thread's Readable back down to the child using postMessage() (and force/fudge one when the main thread's piped-Writable returned true from our Readable there)

Now you have a way to stream to the main thread.
With the original Stream you wanted to send from the child thread, just .pipe() it to your Writable.
In the main thread simply read from your Readable or pipe it on as if you had it for real.

Comments

1

I have a similar requirement. I've built a threadpool that sends data back and forth between threads using shared memory. The next step is to add streams. There are two reasons for this: 1) handle data of any size (vs statically allocated buffer) and 2) cut down on memory utilization (i.e. a static buffer of max chunk size that is used serially). The following repo is close to what I need, but I have to go in both directions and control when threads are created/destroyed, etc. https://github.com/pinojs/thread-stream. I think your requirement is to go in the reverse direction also. Nevertheless, this may help with finding a solution.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.