0

I have a list of zip files that It needs to be processed in paralle.

That's what I'm doing to fire the threads:

 for i in range(len(list_files)):
        p.apply_async(process_data,  args=(general_pd, list_files[i], TimeStamp[i], S1_Sync, S2_Sync, S3_Sync, S4_Sync, S5_Sync, S6_Sync, S7_Sync, T1_Sync, T2_Sync, T3_Sync, T4_Sync, T5_Sync, Time_Sync))
    p.close()
    p.join()

S1_SYNC and so on are lists

S1_Sync = manager.list()
S2_Sync = manager.list()

in process data:

  def process_data(general_dic, path, time, s1, s2, s3, s4, s5, s6, s7, t1, t2, t3, t4, t5, t_sync):

  if os.path.isfile(path):
                 s1.append(df_conv['C_strain_COY'].T.max())
                 s2.append(df_conv['C_strain_CUY'].T.max())
                 s3.append(df_conv['C_strain_ROX'].T.max())
                 s4.append(df_conv['C_strain_CUX'].T.max())
                 s5.append(df_conv['C_strain_CMX'].T.max())
                 s6.append(df_conv['C_strain_COX'].T.max())
                 s7.append(df_conv['C_strain_LOX'].T.max())

                 t1.append(df_conv['C_temp_CUY'].T.max())
                 t2.append(df_conv['C_temp_COY'].T.max())
                 t3.append(df_conv['C_temp_CUX'].T.max())
                 t4.append(df_conv['C_temp_CMX'].T.max())
                 t5.append(df_conv['C_temp_COX'].T.max())
                 print("file does exist at this time")
            else:
                s1.append(np.nan)
                s2.append(np.nan)
                s3.append(np.nan)
                s4.append(np.nan)
                s5.append(np.nan)
                s6.append(np.nan)
                s7.append(np.nan)
                t1.append(np.nan)
                t2.append(np.nan)
                t3.append(np.nan)
                t4.append(np.nan)
                t5.append(np.nan)

If there is a file I get the median of that data and store it in S1, S2,..etc.

After the p.close() I write the data to excel

print("writing data")
    result['TimeStamp'] = pd.Series(convert_proxy(Time_Sync))
    result['C_strain_COY'] = pd.Series(convert_proxy(S1_Sync))
    result['C_strain_CUY'] = pd.Series(convert_proxy(S2_Sync))
    result['C_strain_ROX'] = pd.Series(convert_proxy(S3_Sync))
    result['C_strain_CUX'] = pd.Series(convert_proxy(S4_Sync))
    result['C_strain_CMX'] = pd.Series(convert_proxy(S5_Sync))
    result['C_strain_COX'] = pd.Series(convert_proxy(S6_Sync))
    result['C_strain_LOX'] = pd.Series(convert_proxy(S7_Sync))

    result['C_temp_CUY'] = pd.Series(convert_proxy(T1_Sync))
    result['C_temp_COY'] = pd.Series(convert_proxy(T2_Sync))
    result['C_temp_CUX'] = pd.Series(convert_proxy(T3_Sync))
    result['C_temp_CMX'] = pd.Series(convert_proxy(T4_Sync))
    result['C_temp_COX'] = pd.Series(convert_proxy(T5_Sync))
    #general_pd.sort_values(by='TimeStamp')
    writer = pd.ExcelWriter(r'c:\ahmed\median_data_meter_1_max.xlsx', engine='xlsxwriter')
    # Convert the dataframe to an XlsxWriter Excel object.
    general_pd.to_excel(writer, sheet_name='Sheet1')
    # Close the Pandas Excel writer and output the Excel file.
    writer.save()

I convert each proxy list into regural list and save it in excel.

The problem is I get data that is processed in random positions of timestamp which I generate, for example some data are placed in wrong places in the excel sheet.

Is it a synchronization problem ? How would I merge the data correctly from each thread ?

That's how I generate the timestamps:

ts = datetime.datetime.strptime('2018-08-21', '%Y-%m-%d')
    end_ts = datetime.datetime.strptime('2018-08-23', '%Y-%m-%d')
    while ts < end_ts:
           print(ts)
           ts += datetime.timedelta(minutes=15)
           path = os.path.join(
               r'\\file01\vnxmo\\TIT\NAS\\Baudynamik\\_meter\\SpeedFT-meter1\\peakdata\\' + ts.strftime("%Y-%m-%d") + r'\\peakdata_' + ts.strftime(
                   "%Y%m%d_%H%M") + r'.bz2')
           TimeStamp.append(ts)
           list_files.append(path)
3
  • Can you just use a process pool? They block until all threads are done and keep the objects in the same order. Commented Nov 13, 2018 at 12:03
  • in process pool, how would I pass the arguments ? Commented Nov 13, 2018 at 12:37
  • @Neil can you write an answer with that suggestion ? Commented Nov 13, 2018 at 12:37

1 Answer 1

1

If a process is embarrassingly parallel and no communication needs to happen between threads, then I would just use a process pool context manager. You have a lot of arguments, which can be tricky, but there are lots of workarounds for passing arguments. I sometimes use nested functions or global variables to curry down to one variable. Otherwise, you can always send in a list or something list like.

from multiprocessing.dummy import Pool as ProcessPool # Only use this if you're CPU bound. If you're IO bound use a threadpool rather

my_zip_files = [
    ("arg1", "arg2", "arg3"),      # Whatever the arguments are. Just roll them into a tuple and store all the tuples in a list.
    ("arg12", "arg22", "arg23")
]

def do_something(arg):
    arg1 = arg[0]   # recover the individual arguments. Can use python's * syntax as well.
    arg2 = arg[1]
    arg3 = arg[2]
    result = _do_something(arg1, arg2, arg3)
    return result

def _do_something(arg1, arg2, arg3):
    <whatever logic>

with ProcessPool(4) as pool:
    result = pool.map(do_something, my_zip_files)
print(result) # And now the order of the files should be the same as the order you put them in. I've run some checks and order seems to be preserved but maybe check the docs to be sure.
Sign up to request clarification or add additional context in comments.

8 Comments

what If I have a list that is iterable, and other arguments are list that are passed as an object ?
y = parmap.map(process_data,list_files, S1_Sync, S2_Sync, S3_Sync, S4_Sync, S5_Sync, S6_Sync, S7_Sync, T1_Sync, T2_Sync, T3_Sync, T4_Sync, T5_Sync)
I'm trying that
Iterables should be handled fine. Otherwise just case them: list(iterable)
And you can pass lists as an element of a list that is an argument. Am I understanding you correctly?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.