新手-ish python/pandas 用戶在這里。我一直在嘗試在 read_fwf 中使用 chunksize arg 并迭代 value_counts 變量。我編寫了一個函數來傳遞參數,例如文件迭代器和變量來解析和計數。我希望并行化這個函數,并能夠同時將 2 個文件讀入同一個函數。它似乎確實有效……但是,我的速度出乎意料地變慢了。線程在同一時間完成,但一個似乎正在減慢另一個(IO 瓶頸?)。通過按順序而不是并行運行函數(324 秒對 172 秒),我得到了更快的時間。想法?我在執行這個錯誤嗎?我試過多進程但啟動映射錯誤,我無法腌制文件迭代器(read_fwf 的輸出)。testdf1=pd.read_fwf(filepath_or_buffer='200k.dat',header=None,colspecs=wlist,names=nlist,dtype=object,na_values=[''],chunksize=1000) testdf2=pd.read_fwf(filepath_or_buffer='200k2.dat',header=None,colspecs=wlist,names=nlist,dtype=object,na_values=[''],chunksize=1000)def tfuncth(df,varn,q,*args): td={} for key in varn.keys(): td[key]=pd.Series() for rdf in df: if args is not None: for arg in args: rdf=eval(f"rdf.query(\"{arg}\")") for key in varn.keys(): ecode=f'rdf.{varn[key]}.value_counts()' td[key]=pd.concat([td[key],eval(ecode)]) td[key]=td[key].groupby(td[key].index).sum() for key in varn.keys(): td[key]=pd.DataFrame(td[key].reset_index()).rename(columns={'index':'Value',0:'Counts'}).assign(Var=key,PCT=lambda x:round(x.Counts/x.Counts.sum()*100,2))[['Var','Value','Counts','PCT']] q.put(td) bands={ '1':'A', '2':'B', '3':'C', '4':'D', '5':'E', '6':'F', '7':'G', '8':'H', '9':'I' } vdict={ 'var1':'e1270.str.slice(0,2)', 'var2':'e1270.str.slice(2,3)', 'band':'e7641.str.slice(0,1).replace(bands)' }更新:經過大量閱讀,這也是我得出的結論。這是非常簡單的結論,我敢肯定,所以如果有人知道,請告訴我。Pandas 不是一個完全多線程友好的包顯然有一個名為 'dask' 的包,它復制了很多 Pandas 函數。所以我會調查一下。在許多情況下,Python 并不是真正的多線程兼容語言因此,諸如“dask”之類的包可以利用多線程??梢苑蛛x多個線程,但只能并行非 CPU 綁定函數。我的代碼是用 IO 和 CPU 包裝的。簡單的 IO 可能是并行運行的,但會等待處理器執行。我計劃通過編寫僅 IO 操作并嘗試線程來測試這一點。Python 受其編譯器的約束。在純 python 中,它被 GIL 解釋和綁定,一次只執行一個線程Python 可以使用在線程上沒有全局解釋器鎖 (GIL) 的不同編譯器進行編譯。
1 回答

眼眸繁星
TA貢獻1873條經驗 獲得超9個贊
我確實設法通過使用多處理包來解決這個問題并解決我的問題。我遇到了兩個問題。
1) multiprocessing 包與 Juypter Notebook 不兼容
和
2)你不能pickle一個pandas閱讀器的句柄(傳遞給進程的多處理pickles對象)。
我通過在 Notebook 環境之外編碼來修復 1,我通過將打開分塊文件所需的參數傳遞給每個進程并讓每個進程開始自己的塊讀取來修復 2。
完成這兩件事后,我的速度比順序運行提高了 60%。
- 1 回答
- 0 關注
- 318 瀏覽
添加回答
舉報
0/150
提交
取消