15

I have created a subscription filter in CloudWatch log group and made it stream to my lambda function, but am getting an error in my lambda function.

Code:

import boto3
import binascii
import json
import base64
import zlib

def stream_gzip_decompress(stream):
    dec = zlib.decompressobj(32 + zlib.MAX_WBITS)  # offset 32 to skip the header
    foo=''
    for chunk in stream:
        rv = dec.decompress(chunk)
        if rv:
            foo += rv
    return foo

def lambda_handler(event, context):
    # Decode and decompress the AWS Log stream to extract json object
    stream=json.dumps(event['awslogs']['data'])
    f = base64.b64decode(stream)
    payload=json.loads(stream_gzip_decompress(f.decode(f)))
    print(payload)

Error:

Response:

{
  "errorMessage": "decode() argument 1 must be str, not bytes",
  "errorType": "TypeError",
  "stackTrace": [
    [
      "/var/task/lambda_function.py",
      34,
      "lambda_handler",
      "payload=json.loads(stream_gzip_decompress(f.decode(f)))"
    ]
  ]
}

Any help or clue would be greatly appreciated! If you have any alternative solution please suggest. My requirement is to handle logs from CloudWatch using lambda.

Thanks in Advance !!

1
  • 1
    Your failure is beacuse you are trying to json.dumps(event['awslogs']['data']) where data is the base 64 encoded gzip-compressed list data. just pass the event['awslogs']['data'] data straight to decode as in P. Ryan's answer Commented Sep 19, 2018 at 22:48

3 Answers 3

50

In case anyone else is looking for help with this topic.

I took a slightly different approach, but I did see an 'awslog' key in the event.

Here is a sample that I was successful with. Python 3.6 Lambda. Setup cloudwatch trigger to call the lambda

import gzip
import json
import base64


def lambda_handler(event, context):
    print(f'Logging Event: {event}')
    print(f"Awslog: {event['awslogs']}")
    cw_data = event['awslogs']['data']
    print(f'data: {cw_data}')
    print(f'type: {type(cw_data)}')
    compressed_payload = base64.b64decode(cw_data)
    uncompressed_payload = gzip.decompress(compressed_payload)
    payload = json.loads(uncompressed_payload)

    log_events = payload['logEvents']
    for log_event in log_events:
        print(f'LogEvent: {log_event}')
Sign up to request clarification or add additional context in comments.

2 Comments

I like this answer better than the other as it is simpler and clearer. Thanks!
Nice one. But, How to get the function name from where these logs are coming?
5

Below is the outline I normally follow when processing CloudWatch Logs being sent to AWS Lambda.

import gzip
import json
from StringIO import StringIO

def lambda_handler(event, context):
    cw_data = str(event['awslogs']['data'])
    cw_logs = gzip.GzipFile(fileobj=StringIO(cw_data.decode('base64', 'strict'))).read()
    log_events = json.loads(cw_logs)
    for log_event in logevents['logEvents']:
        # Process Logs

I see that you are treating the data sent to the AWS Lambda as a JSON object. You first want to base64 decode then unzip the data. After decoding and decompressing you should have the JSON object with the log information.

3 Comments

Appreciate your effort to answer the question, but am getting keyerror as dict type "event" does not have key 'awslogs'
format should be {'awslogs': {'data': <base 64 encoded gzip-compressed list as a str> } }. Im assuming the str(event['awslogs'])['data'] is a typo and meant to be str(event['awslogs']['data'])
For python 3, StringIO import should be: from io import StringIO
0

Here is quasar's answer converted to Python 3.

import gzip
import json
import base64
from io import BytesIO

cw_data = str(event['awslogs']['data'])
cw_logs = gzip.GzipFile(fileobj=BytesIO(base64.b64decode(cw_data, validate=True))).read()
log_events = json.loads(cw_logs)
for log_event in log_events['logEvents']:
    # Process Logs

The main change is using io.BytesIO and a different base64 decode function to get to the log event data.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.