So I have about 5000 csv files under one directory, which contains stocks' minutes data. Each file is named by their symbol. like stock AAPL is named as AAPL.csv.
I try to do some clean up and editing on each of them. In this case, I try to convert one column which contains unix epoch datatime into readable date and time. I also want to change a label of one column.
I try to use multiprocessing to speed up the process. But first try just kill my Macbook.
I run it inside VScode's jupyter notebook. If that matters.
I wonder what did I do wrong and how to improve. And how to handle similar tasks in python and pandas.
Thank you!
Here is my code.
# Define operations will be used in multiprocessing handling
def clean_up(file,fail_list):
print('Working on {}'.format(file))
stock = pd.read_csv('./Data/minutes_data/' + file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = stock.apply(lambda row: epoch_converter.get_date_from_mill_epoch(row['datetime']), axis=1)
stock['Time'] = stock.apply(lambda row: epoch_converter.get_time_from_mill_epoch(row['datetime']), axis=1)
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' + file)
except:
print('{} not successful'.format(file))
fail_list = fail_list.append(file)
fail_list.to_csv('./failed_list.csv')
#Get file list to working on.
file_list = os.listdir('./Data/minutes_data/')
#prepare failed_list
fail_list = pd.DataFrame([])
#Loop through each file
processes = []
for file in file_list:
p = multiprocessing.Process(target=clean_up, args=(file,fail_list,))
processes.append(p)
p.start()
for process in processes:
process.join()
Update: CSV_FILE_SAMPLE
,open,high,low,close,volume,datetime 0,21.9,21.9,21.9,21.9,200,1596722940000 0,20.0,20.0,19.9937,19.9937,200,1595266500000 1,20.0,20.0,19.9937,19.9937,500,1595266800000 2,20.0,20.0,19.9937,19.9937,1094,1595267040000 3,20.0,20.0,20.0,20.0,200,1595268240000
Final Update:
Combine answers from @furas and @jsmart, the script managed to reduce processing time of 5000 csv from hours to under 1 minutes (Under 6 core i9 on Macbook pro). I'm happy. You guys are awesome. Thanks!
The final scripts is here:
import pandas as pd
import numpy as np
import os
import multiprocessing
import logging
logging.basicConfig(filename='./log.log',level=logging.DEBUG)
file_list = os.listdir('./Data/minutes_data/')
def cleanup(file):
print('Working on ' + file)
stock = pd.read_csv('./Data/minutes_data/' + file)
try:
#Convert datetime columns into readable date and time column
stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
stock['Time'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.time
#Rename 'Unnamed: 0' column into 'Minute'
stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)
#Write it back to new file
stock.to_csv('./Data/working_data/' + file)
except:
print(file + ' Not successful')
logging.warning(file + ' Not complete.')
pool = multiprocessing.Pool()
pool.map(cleanup, file_list)
multiprocessing.Processyou creates5000processes at the same time. Better use ie.multiprocessing.Pool(10)to run only10processes at the same time. And when some process ends work then it uses it with next file on list.Poolin first example.Poolyou can also usereturnto send failed file to main process and then it can save it. Saving in the same filefailed_list.csvin different processes may gives wrong results. Besides processes don't share variables and every process will have own copy of emptyfailed_fileand it will save only one value and remove previous value in file'./failed_list.csv'.