近期有將 Quandl 美國股價歷史資料匯入 Mysql 中,並且與他人做資料的比對,發現資料上會有誤差的情形,對方也有提供給我一份有誤差資料的清單:
需要確認從 Quandl 下載的歷史資料是否正確,若資料是正確的但匯入後的 DB 資料有問題,那就需要再另查問題了。
此處使用 pandas read_csv 來進行資料的比對,但馬上遇到一個問題,歷史有三千多萬筆的資料,如果直接透過 read_csv 來讀取資料,記憶體會直接耗盡:
不過也不用太擔心,read_csv 有提供 chunksize 參數,用途是分塊讀取檔案,chunksize 的值代表一次要讀入多少數據,範例如下:
import pandas as pd from datetime import datetime import os def do_somthing(): ... if __name__ == '__main__': chunksize = 10 ** 6 # 設定每次取出的數量 largeFile = "D:/pythonProject/stockProject/QuandlDailyUSdata/historyData/EOD.csv" for chunkData in pd.read_csv(largeFile, chunksize=chunksize): print("%s ~ %s 資料處理中..." % (total, total+chunksize)) total = total + chunksize do_somthing(chunkData)
再來是資料的比對,pandas 提供了強大的 merge 功能,merge 有很多種的變化,這邊先針對此次案例要達到的目的來做說明,我有實做一個 getBothData 的 function,參數為兩個要比較的 dataframe 資料,目的為取出共同存在的資料並另存到其他的 csv 檔案,pandas merge 函式參數說明:
1. on=dup_cols
為兩份資料共同的部分,以 DB 來比喻的話,可以看成 JOIN ON 欄位。
2. how=left
是左匹配,以左邊的 df1 為基礎匹配右邊的 df2 中的內容。匹配不到的內容會以 NaN 值顯示。
3. indicator=’Exist’
顯示 merge 的 mode,可以知道 merge 後的結果,有三種可能(left_only、right_only、both),有三種資料可以輸入:
- False (Default)
- True (欄位名稱為 _merge)
- 任意字串 (若輸入 Exist 則欄位名稱為 Exist)
def getBothData(df1, df2): bothFile = 'your_file.csv' dup_cols = ['symbol', 'date'] bothData = pd.merge(df1, df2, on=dup_cols, how='left', indicator='Exist').query('Exist=="both"') if os.path.isfile(bothFile): bothData.to_csv(bothFile, mode='a', index=False, header=False) else: bothData.to_csv(bothFile, index=False) print(bothData)
完整的代碼如下:
import pandas as pd from datetime import datetime import os def getBothData(df1, df2): bothFile = 'your_file.csv' dup_cols = ['symbol', 'date'] bothData = pd.merge(df1, df2, on=dup_cols, how='left', indicator='Exist').query('Exist=="both"') if os.path.isfile(bothFile): bothData.to_csv(bothFile, mode='a', index=False, header=False) else: bothData.to_csv(bothFile, index=False) print(bothData) if __name__ == '__main__': total = 0 diffFile = pd.read_csv('比對檔案 絕對路徑') diffFile['date'] = [datetime.strptime(d, "%Y/%m/%d").strftime('%Y-%m-%d') for d in diffFile['date'].values.tolist()] # 轉換日期格式 chunksize = 10 ** 6 # 設定每次取出的數量 largeFile = "largeFile 絕對路徑" for chunkData in pd.read_csv(largeFile, chunksize=chunksize): print("%s ~ %s 資料處理中..." % (total, total+chunksize)) total = total + chunksize getBothData(chunkData, diffFile)
參考資料:
留言列表