I have this code which runs for a long time because it processes about 6000 entries. I am trying to run it parallelly so it takes less time. The multiprocessing code that I wrote is not working as expected. For loop in the function lambda_handler is what needs to parallelized. I am still new to python, so any help will be appreciated.
Here's my code
import json
import boto3
from pprint import pprint
import os
import botocore
from datetime import datetime
import csv
from botocore.client import Config
import logging
import urllib3
import sys
import time
from boto3.session import Session
from botocore.client import Config
import multiprocessing as mp
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
config = Config(connect_timeout=7200,
retries={'max_attempts': 5},
proxies={'http': os.environ.get('HTTP_PROXY'),'https': os.environ.get('HTTPS_PROXY'),}
)
LOGGER=logging.getLogger()
LOGGER.setLevel(logging.INFO)
instance_id = os.environ.get('instance_id')
region_name = os.environ.get('Region')
s3_bucket = os.environ.get('s3_bucket')
env = os.environ.get('export_environment')
MaxResults = 4
# Multiprocessing
pool = mp.Pool(4)
def getUserList(instance_id: str, region_name: str, env: str):
users_all = ''
client = boto3.client('connect',region_name=region_name, verify=False, config=config)
#get list of Amazon Connect users - first batch
response = client.list_users(
InstanceId=instance_id,
MaxResults=MaxResults
)
users=response['UserSummaryList']
while "NextToken" in response:
response = client.list_users(InstanceId=instance_id, MaxResults=MaxResults, NextToken=response["NextToken"])
users.extend(response['UserSummaryList'])
users_all = [*users_all,*users]
return users_all
def lambda_handler(instance_id: str, region_name: str, env: str, s3_bucket):
client = boto3.client('connect',region_name="us-east-1",verify=False, config=config)
#get list of Amazon Connect users
users=getUserList(instance_id,region_name,env)
# This loop needs to be parallelized
for user in users:
#time.sleep(0.1)
user_id=user['Id']
user_name=user['Username']
response=client.describe_user(
InstanceId=instance_id,
UserId=user_id
)
user_firstname=response['User']['IdentityInfo']['FirstName']
user_lastname=response['User']['IdentityInfo']['LastName']
user_name=response['User']['Username']
user_routing_profile_id=response['User']['RoutingProfileId']
user_security_profile_ids=response['User']['SecurityProfileIds']
#user_email=response['User']['IdentityInfo']['Email']
user_phonetype=response['User']['PhoneConfig']['PhoneType']
user_desk_number=response['User']['PhoneConfig']['DeskPhoneNumber']
user_autoaccept=response['User']['PhoneConfig']['AutoAccept']
user_acw_timeout=response['User']['PhoneConfig']['AfterContactWorkTimeLimit']
try:
user_hierarchy_group_id=response['User']['HierarchyGroupId']
except KeyError as e:
user_hierarchy_group_id=""
security_profile_name_list=''
hierarchy_group_name=""
user_info=""+user_firstname+","+user_lastname+","+user_name+","+routing_profile_name+","+security_profile_name_list[1:]+","+user_phonetype+","+user_desk_number+","+str(user_autoaccept)+","+str(user_acw_timeout)+","+hierarchy_group_name+"\r\n"
print(user_info)
results = pool.starmap(lambda_handler, [(instance_id,region_name,env,s3_bucket)])
pool.close()