4

I have fetched the data from table tb_project_milestones and want to insert this projectMilestoneRow in a table tb_xyz using streams. I checked the documentation, but couldn't find how to implement it. Has anyone implemented reading through streams and inserting through streams in MySQL.

let insertProjectMilestones = [];
const getProjectMilestones = executeQueryStream.query('SELECT * FROM tb_project_milestones WHERE project_id = ? ');

getProjectMilestones
.on('error', function(err) {
  // Handle error, an 'end' event will be emitted after this as well
})
.on('result', function(projectMilestoneRow) {
  // Pausing the connnection is useful if your processing involves I/O
  connection.pause();

  processRow(projectMilestoneRow, function() {
    _.each(payload.projects, (project_id)=> {
      _.each(projectMilestoneRow, (el)=> {
        insertProjectMilestones.push([el.project_milestone_id, el.name, el.prefix, el.short_name, el.description, el.pre_requisites, project_id,
          el.milestone_template_id, el.generic_milestone_id, el.planned_date, el.actual_date, el.forecast_date,
          el.planned_date_only, el.forecast_date_only, el.actual_date_only, el.planned_time_only, el.forecast_time_only, el.actual_time_only,
          el.planned_date_formula, el.actual_date_formula, el.forecast_date_formula, el.planned_date_is_active, el.forecast_date_is_active,
          el.actual_date_is_active, el.creation_datetime, el.allow_notes, el.forecast_date_allow_notes, el.actual_date_allow_notes,
          el.planned_date_allow_notes, 0, el.requires_approval]);
      });
    });

    connection.resume();
  });
})
.on('end', function() {
  // all rows have been received
});

EDIT

I used streams in this case because millions of records are fetched from tb_project_milestones and then inserted into an array(after a manipulation) and then pushed into another table.

Considering the fact that pushing these many rows in the array would increase the memory of node I thought of using stream here.

Is stream better choice or could I just implement a batch insert in DB using transactions?

2 Answers 2

3

You can use knex stream and async iteration (ES2018/Node 10) for that

const knexClient = knex(someMysqlClientSettings);

const dbStream = knexClient("tb_project_milestones").where({ projectId }).stream();

for await (const row of dbStream){
    const processedRowObj = process(row);
    await knexClient("tb_xyz").insert(processedRowObj)
}
Sign up to request clarification or add additional context in comments.

Comments

2

Wouldn't it be much faster and simpler to perform the single SQL statement:

INSERT INTO insertProjectMilestones (...)
    SELECT ... FROM tb_project_milestones;

That way, the data is not shoveled to the client only to be turned around and shoveled back to the server.

And you could do transformations (expressions in the SELECT) and/or filtering (WHERE in SELECT) at the same time.

MySQL will impose essentially no limits on how big the table can be.

2 Comments

Insert into with bulk has a limit. Before inserting into DB the packet size is checked you might go on and increase that but still that has a limit. Mysql cannot insert millions of row at once you will have to divide them in chunks and implement transactions so as to check the validity of data. So directly inserting using this approach is not feasible here. @Rick James
INSERT INTO .. SELECT .. has no limit; INSERT INTO .. VALUES (..),(..),... does have a limit on the size of the text of the query.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.