1

I am trying to push logs for a multiprocessing job into ECS S3. Following is my code snippet:

logger.py

import logging
from S3_log_handler import S3LogHandler

def setup_logger():
    # Set up the logger
    logger = logging.getLogger('s3_logger')
    logger.setLevel(logging.DEBUG)  # Capture all log levels

    # Set up the custom S3 logging handler
    s3_handler = S3LogHandler()

    # Create a formatter for the log messages (timestamp + log level + message)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    s3_handler.setFormatter(formatter)

    # Add the S3 handler to the logger
    logger.addHandler(s3_handler)
    logger.addHandler(stream_handler)

    return logger

S3_log_handler.py:

import logging
import boto3
from multiprocessing import Lock

lock = Lock()


class S3LogHandler(logging.Handler):

    s3 = boto3.client('s3', ...)

    def __init__(self):
        super().__init__()
        self.bucket_name = 'bucket_name'
        self.prefix = 's3_path/log_sample_test.log'
        self.s3_client = S3LogHandler.s3

    def emit(self, record):
        log_message = self.format(record) + '\n'
        log_filename = f"{self.prefix}"

        with lock:
            try:
                try:
                    current_content = self.s3_client.get_object(Bucket=self.bucket_name, Key=log_filename)
                    current_data = current_content['Body'].read().decode('utf-8')
                except self.s3_client.exceptions.NoSuchKey:
                    # If the log file does not exist, start with an empty string
                    current_data = ""

                    # Append the new log message to the existing log content
                updated_data = current_data + '\n' + log_message
                # Upload log message to S3
                self.s3_client.put_object(
                    Bucket=self.bucket_name,
                    Key=log_filename,
                    Body=updated_data
                )
                print(f"Log uploaded to s3://{self.bucket_name}/{log_filename}")
            except Exception as e:
                print(f"Failed to upload log to S3: {str(e)}")

test_script.py:

logger = setup_logger()

result = submodule_function(json_data, logger)

Inside submodule_function:

from concurrent.futures import ProcessPoolExecutor
...
with ProcessPoolExecutor(max_workers=3) as executor:
    for p, (result) in zip(partition_data, executor.map(target_function, partition_data, itertools.repeat(logger))):
        ....

The issue I am facing is, not all logs inside target_function are getting added to log file in S3. My guess is logs are getting overwritten, even though I am using multiprocessing.Lock.

How can I make sure no parallel processes are running while writing data in file in emit()?

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.