1

So I have about 5000 csv files under one directory, which contains stocks' minutes data. Each file is named by their symbol. like stock AAPL is named as AAPL.csv.

I try to do some clean up and editing on each of them. In this case, I try to convert one column which contains unix epoch datatime into readable date and time. I also want to change a label of one column.

I try to use multiprocessing to speed up the process. But first try just kill my Macbook.

I run it inside VScode's jupyter notebook. If that matters.

I wonder what did I do wrong and how to improve. And how to handle similar tasks in python and pandas.

Thank you!

Here is my code.

# Define operations will be used in multiprocessing handling
def clean_up(file,fail_list):
    print('Working on {}'.format(file))
    stock = pd.read_csv('./Data/minutes_data/' + file)

    try:
        #Convert datetime columns into readable date and time column
        stock['Date'] = stock.apply(lambda row: epoch_converter.get_date_from_mill_epoch(row['datetime']), axis=1)
        stock['Time'] = stock.apply(lambda row: epoch_converter.get_time_from_mill_epoch(row['datetime']), axis=1)

        #Rename 'Unnamed: 0' column into 'Minute'
        stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)

        #Write it back to new file
        stock.to_csv('./Data/working_data/' + file)
    except:
        print('{} not successful'.format(file))
        fail_list = fail_list.append(file)
        fail_list.to_csv('./failed_list.csv')



#Get file list to working on.
file_list = os.listdir('./Data/minutes_data/')

#prepare failed_list
fail_list = pd.DataFrame([])
#Loop through each file
processes = []
for file in file_list:
    p = multiprocessing.Process(target=clean_up, args=(file,fail_list,))
    processes.append(p)
    p.start()

for process in processes:
    process.join()

Update: CSV_FILE_SAMPLE

,open,high,low,close,volume,datetime 0,21.9,21.9,21.9,21.9,200,1596722940000 0,20.0,20.0,19.9937,19.9937,200,1595266500000 1,20.0,20.0,19.9937,19.9937,500,1595266800000 2,20.0,20.0,19.9937,19.9937,1094,1595267040000 3,20.0,20.0,20.0,20.0,200,1595268240000

Final Update:

Combine answers from @furas and @jsmart, the script managed to reduce processing time of 5000 csv from hours to under 1 minutes (Under 6 core i9 on Macbook pro). I'm happy. You guys are awesome. Thanks!

The final scripts is here:

import pandas as pd
import numpy as np
import os
import multiprocessing
import logging

logging.basicConfig(filename='./log.log',level=logging.DEBUG)

file_list = os.listdir('./Data/minutes_data/')

def cleanup(file):
    print('Working on ' + file)
    stock = pd.read_csv('./Data/minutes_data/' + file)
    
    try:
        #Convert datetime columns into readable date and time column
        stock['Date'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.date
        stock['Time'] = pd.to_datetime(stock['datetime'],unit='ms',utc=True).dt.tz_convert('America/New_York').dt.time

        #Rename 'Unnamed: 0' column into 'Minute'
        stock.rename(columns={'Unnamed: 0':'Minute'}, inplace=True)

        #Write it back to new file
        stock.to_csv('./Data/working_data/' + file)
    except:
        print(file + ' Not successful')
        logging.warning(file + ' Not complete.')



pool = multiprocessing.Pool()
pool.map(cleanup, file_list)
8
  • did you get an error message? what was it? Commented Sep 5, 2020 at 6:24
  • 2
    using multiprocessing.Process you creates 5000 processes at the same time. Better use ie. multiprocessing.Pool(10) to run only 10 processes at the same time. And when some process ends work then it uses it with next file on list. Commented Sep 5, 2020 at 6:37
  • 1
    It's super helpful to profile first - that will let you know where the slowdown is. If it's just waiting for file IO, a thread pool might be a better option. Commented Sep 5, 2020 at 6:47
  • 1
    documentation for multiprocessing even show Pool in first example. Commented Sep 5, 2020 at 6:47
  • 1
    with Pool you can also use return to send failed file to main process and then it can save it. Saving in the same file failed_list.csv in different processes may gives wrong results. Besides processes don't share variables and every process will have own copy of empty failed_file and it will save only one value and remove previous value in file './failed_list.csv'. Commented Sep 5, 2020 at 6:50

2 Answers 2

2

Using Process in loop you create 5000 process at the same time

You could use Pool to control how many processes works at the same time - and it will automatically free process with next file.

It also can use return to send name of failed file to main process and it can save file once. Using the same file in many processes can makes wrong data in this file. Besides processes don't share variables and every process will have own empty DataFrame and later will save only own failed file - so it will remove previous content.

def clean_up(file):
    # ... code ...
    
        return None  # if OK
    except:
        return file  # if failed
    
    
# --- main ---

# get file list to working on.
file_list = sorted(os.listdir('./Data/minutes_data/'))

with multiprocessing.Pool(10) as p:
    failed_files = p.map(clean_up, file_list)

# remove None from names
failed_files = filter(None, failed_files)

# save all
df = pd.DataFrame(failed_files)
df.to_csv('./failed_list.csv')

There is also multiprocessing.pool.ThreadPool which uses threads instead of processes.

Module concurrent.futures has also ThreadPoolExecutor and ProcessPoolExecutor

You can also try to do it with external modules - but I don't remeber which can be useful.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you @furas! I think I didn’t understand how multiprocessing works at first place. Your answer is very helpful!
I was thinking also about external modules like ray, dask, joblib but I don't have experience - see also 6 Python libraries for parallel processing. On Linux I can also use program GNU parallel and script which works siple file and do ls | parallel -n 10 python clean_up.py
1

The original post asked "...how to handle similar tasks in python and pandas."

  • Replacing .apply(..., axis=1) can increase throughput by 100x or better.
  • Here is an example with 10_000 rows of data:
%%timeit
df['date'] = df.apply(lambda x: pd.to_datetime(x['timestamp'], unit='ms'), axis=1)
792 ms ± 26.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Re-write as:

%%timeit
df['date'] = pd.to_datetime(df['date'], unit='ms')
4.88 ms ± 38.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Sample data:

print(df['timestamp'].head())
0    1586863008214
1    1286654914895
2    1436424291218
3    1423512988135
4    1413205308057
Name: timestamp, dtype: int64

3 Comments

Sorry but I’m not fully understand your solution. Do you mean Pandas has built in functions to convert epoch milliseconds format into readable date and time format?
Yes. The argument unit='ms' indicates that the original values are in milliseconds since the epoch. And you can use .dt.date or .dt.time to pull out just date, or just time. Docs are here to_datetime
You made my day. Thanks!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.