I have a list of zip files that It needs to be processed in paralle.
That's what I'm doing to fire the threads:
for i in range(len(list_files)):
p.apply_async(process_data, args=(general_pd, list_files[i], TimeStamp[i], S1_Sync, S2_Sync, S3_Sync, S4_Sync, S5_Sync, S6_Sync, S7_Sync, T1_Sync, T2_Sync, T3_Sync, T4_Sync, T5_Sync, Time_Sync))
p.close()
p.join()
S1_SYNC and so on are lists
S1_Sync = manager.list()
S2_Sync = manager.list()
in process data:
def process_data(general_dic, path, time, s1, s2, s3, s4, s5, s6, s7, t1, t2, t3, t4, t5, t_sync):
if os.path.isfile(path):
s1.append(df_conv['C_strain_COY'].T.max())
s2.append(df_conv['C_strain_CUY'].T.max())
s3.append(df_conv['C_strain_ROX'].T.max())
s4.append(df_conv['C_strain_CUX'].T.max())
s5.append(df_conv['C_strain_CMX'].T.max())
s6.append(df_conv['C_strain_COX'].T.max())
s7.append(df_conv['C_strain_LOX'].T.max())
t1.append(df_conv['C_temp_CUY'].T.max())
t2.append(df_conv['C_temp_COY'].T.max())
t3.append(df_conv['C_temp_CUX'].T.max())
t4.append(df_conv['C_temp_CMX'].T.max())
t5.append(df_conv['C_temp_COX'].T.max())
print("file does exist at this time")
else:
s1.append(np.nan)
s2.append(np.nan)
s3.append(np.nan)
s4.append(np.nan)
s5.append(np.nan)
s6.append(np.nan)
s7.append(np.nan)
t1.append(np.nan)
t2.append(np.nan)
t3.append(np.nan)
t4.append(np.nan)
t5.append(np.nan)
If there is a file I get the median of that data and store it in S1, S2,..etc.
After the p.close() I write the data to excel
print("writing data")
result['TimeStamp'] = pd.Series(convert_proxy(Time_Sync))
result['C_strain_COY'] = pd.Series(convert_proxy(S1_Sync))
result['C_strain_CUY'] = pd.Series(convert_proxy(S2_Sync))
result['C_strain_ROX'] = pd.Series(convert_proxy(S3_Sync))
result['C_strain_CUX'] = pd.Series(convert_proxy(S4_Sync))
result['C_strain_CMX'] = pd.Series(convert_proxy(S5_Sync))
result['C_strain_COX'] = pd.Series(convert_proxy(S6_Sync))
result['C_strain_LOX'] = pd.Series(convert_proxy(S7_Sync))
result['C_temp_CUY'] = pd.Series(convert_proxy(T1_Sync))
result['C_temp_COY'] = pd.Series(convert_proxy(T2_Sync))
result['C_temp_CUX'] = pd.Series(convert_proxy(T3_Sync))
result['C_temp_CMX'] = pd.Series(convert_proxy(T4_Sync))
result['C_temp_COX'] = pd.Series(convert_proxy(T5_Sync))
#general_pd.sort_values(by='TimeStamp')
writer = pd.ExcelWriter(r'c:\ahmed\median_data_meter_1_max.xlsx', engine='xlsxwriter')
# Convert the dataframe to an XlsxWriter Excel object.
general_pd.to_excel(writer, sheet_name='Sheet1')
# Close the Pandas Excel writer and output the Excel file.
writer.save()
I convert each proxy list into regural list and save it in excel.
The problem is I get data that is processed in random positions of timestamp which I generate, for example some data are placed in wrong places in the excel sheet.
Is it a synchronization problem ? How would I merge the data correctly from each thread ?
That's how I generate the timestamps:
ts = datetime.datetime.strptime('2018-08-21', '%Y-%m-%d')
end_ts = datetime.datetime.strptime('2018-08-23', '%Y-%m-%d')
while ts < end_ts:
print(ts)
ts += datetime.timedelta(minutes=15)
path = os.path.join(
r'\\file01\vnxmo\\TIT\NAS\\Baudynamik\\_meter\\SpeedFT-meter1\\peakdata\\' + ts.strftime("%Y-%m-%d") + r'\\peakdata_' + ts.strftime(
"%Y%m%d_%H%M") + r'.bz2')
TimeStamp.append(ts)
list_files.append(path)