I am trying to push logs for a multiprocessing job into ECS S3. Following is my code snippet:
logger.py
import logging
from S3_log_handler import S3LogHandler
def setup_logger():
# Set up the logger
logger = logging.getLogger('s3_logger')
logger.setLevel(logging.DEBUG) # Capture all log levels
# Set up the custom S3 logging handler
s3_handler = S3LogHandler()
# Create a formatter for the log messages (timestamp + log level + message)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
s3_handler.setFormatter(formatter)
# Add the S3 handler to the logger
logger.addHandler(s3_handler)
logger.addHandler(stream_handler)
return logger
S3_log_handler.py:
import logging
import boto3
from multiprocessing import Lock
lock = Lock()
class S3LogHandler(logging.Handler):
s3 = boto3.client('s3', ...)
def __init__(self):
super().__init__()
self.bucket_name = 'bucket_name'
self.prefix = 's3_path/log_sample_test.log'
self.s3_client = S3LogHandler.s3
def emit(self, record):
log_message = self.format(record) + '\n'
log_filename = f"{self.prefix}"
with lock:
try:
try:
current_content = self.s3_client.get_object(Bucket=self.bucket_name, Key=log_filename)
current_data = current_content['Body'].read().decode('utf-8')
except self.s3_client.exceptions.NoSuchKey:
# If the log file does not exist, start with an empty string
current_data = ""
# Append the new log message to the existing log content
updated_data = current_data + '\n' + log_message
# Upload log message to S3
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=log_filename,
Body=updated_data
)
print(f"Log uploaded to s3://{self.bucket_name}/{log_filename}")
except Exception as e:
print(f"Failed to upload log to S3: {str(e)}")
test_script.py:
logger = setup_logger()
result = submodule_function(json_data, logger)
Inside submodule_function:
from concurrent.futures import ProcessPoolExecutor
...
with ProcessPoolExecutor(max_workers=3) as executor:
for p, (result) in zip(partition_data, executor.map(target_function, partition_data, itertools.repeat(logger))):
....
The issue I am facing is, not all logs inside target_function are getting added to log file in S3. My guess is logs are getting overwritten, even though I am using multiprocessing.Lock.
How can I make sure no parallel processes are running while writing data in file in emit()?