0

I'm trying to code a lambda that triggers an s3 bucket and gets a CSV file when it is uploaded, and parse this file.

I'm using: Node 14x

This is the code:

import { S3Event } from 'aws-lambda';
import { S3 } from 'aws-sdk';
import * as csv from 'fast-csv';

const s3 = new S3({ apiVersion: 'latest' });

export async function hello(event: S3Event, context, cb) {
  event.Records.forEach(async (record) => {
    const bucket = record.s3.bucket.name;
    const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));

    const params: S3.GetObjectRequest = {
      Bucket: bucket,
      Key: key,
    };

    const stream = s3.getObject(params).createReadStream();

    console.log({ stream });

    csv.parseStream(stream, {
      headers: true
    }).on('data', data => { console.log(data); })
      .on('error', error => console.error(error))
      .on('end', (rowCount: number) => console.log(`Parsed ${rowCount} rows`));

    console.log('processo 01 acabou!');
  });
}

When I execute this lambda I'm not receiving anything. In console.log(stream) I'm receiving a PassTrought object...

stream: PassThrough {
    _readableState: ReadableState {
      objectMode: false,
      highWaterMark: 16384,
      buffer: BufferList { head: null, tail: null, length: 0 },
      length: 0,
      pipes: [],
      flowing: null,
      ended: false,
      endEmitted: false,
      reading: false,
      sync: false,
      needReadable: false,
      emittedReadable: false,
      readableListening: false,
      resumeScheduled: false,
      errorEmitted: false,
      emitClose: true,
      autoDestroy: true,
      destroyed: false,
      errored: null,
      closed: false,
      closeEmitted: false,
      defaultEncoding: 'utf8',
      awaitDrainWriters: null,
      multiAwaitDrain: false,
      readingMore: false,
      dataEmitted: false,
      decoder: null,
      encoding: null,
      [Symbol(kPaused)]: null
    },
    _events: [Object: null prototype] { prefinish: [Function: prefinish] },
    _eventsCount: 1,
    _maxListeners: undefined,
    _writableState: WritableState {
      objectMode: false,
      highWaterMark: 16384,
      finalCalled: false,
      needDrain: false,
      ending: false,
      ended: false,
      finished: false,
      destroyed: false,
      decodeStrings: true,
      defaultEncoding: 'utf8',
      length: 0,
      writing: false,
      corked: 0,
      sync: true,
      bufferProcessing: false,
      onwrite: [Function: bound onwrite],
      writecb: null,
      writelen: 0,
      afterWriteTickInfo: null,
      buffered: [],
      bufferedIndex: 0,
      allBuffers: true,
      allNoop: true,
      pendingcb: 0,
      prefinished: false,
      errorEmitted: false,
      emitClose: true,
      autoDestroy: true,
      errored: null,
      closed: false
    },
    allowHalfOpen: true,
    [Symbol(kCapture)]: false,
    [Symbol(kTransformState)]: {
      afterTransform: [Function: bound afterTransform],
      needTransform: false,
      transforming: false,
      writecb: null,
      writechunk: null,
      writeencoding: null
    }
  }
}

I have a picture from my CloudWatch

enter image description here

Can anyone help me, and tell me what I'm doing wrong?

1 Answer 1

1

The issue with your code is that it's not correctly dealing with the asynchronous nature of JavaScript. Specifically, your code is exiting before any asynchronous activity has completed.

Your Lambda function is async so it should return a promise that is ultimately settled (fulfilled or rejected) when your processing of the S3 object(s) has completed. This allows the AWS Lambda runtime environment to await completion.

For example:

exports.handler =  async function(event, context) {
  const promises = event.Records.map((record) => {
    const Bucket = record.s3.bucket.name;
    const Key = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
    const params = { Bucket, Key };
    const stream = s3.getObject(params).createReadStream();

    return new Promise(function(resolve, reject) {
      csv.parseStream(stream, {
        headers: true
      }).on('data', (data) => {
        console.log(data);
      }).on('error', (error) => {
        console.error(error);
        reject(error);
      }).on('end', (rows) => {
        console.log(`Parsed ${rows} rows`);
        resolve(rows);
      });
    });
  });

  return Promise.all(promises);
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.