背景我需要向大約 100 萬臺設備發送大量通知,我正在使用 Google Cloud Functions 構建它。在當前設置中,我將每個設備令牌作為一個 PubSub 消息排隊:在 DataStore 中存儲待處理通知,用于跟蹤重試和成功狀態嘗試發送通知如果重試次數足夠多且未通過,則將通知標記為成功或失敗此過程由人工上傳帶有所有令牌的 CSV 文件手動啟動。原則上內置的重試應該足夠了,但我想確保如果云功能本身或 APNs/FCM 出現問題,我可以以與上傳相同的格式返回所有失敗令牌的 CSV 文件,以便當/如果用戶認為這是一個好主意時,他們只能重試失敗的那個。我將通知作為作業的一部分運行,用于與通知的狀態一起查詢。為此,我設置了一個復合索引,job_id并對status所有匹配的通知運行查詢,并希望將其作為文件流式傳輸給用戶或將其存儲在 Google Cloud Storage 中,以便用戶可以從那里。問題假設接近總通知數量失敗并且我想在一個文件中獲取所有令牌,我的第一個實現只是迭代所有匹配的條目并構建結果。問題是,以這種方式檢索它們時,每 100_000 個條目大約需要 1 分鐘。對于接近所有通知的事情,我將超過 Cloud Functions 的最大超時時間。每個實體總共大約 300 個字節,這使得整個導出大約 300MB。我可能可以通過添加一個更大的索引將其減少到大約一半/三分之二的大小,讓我只對我想要的字段進行投影。我能想到的唯一替代方法是將通知分片以將整個組分成 100 個分片,創建 100 個文件,每個文件包含 10k 條通知,然后將它們全部下載并在用戶嘗試下載文件時將它們拼接在一起。我發布這個問題的原因是,這感覺像是一個相對簡單的問題,而且這個解決方案感覺比我預期的要復雜一些,所以我想我可能會遺漏一些東西。問題我是否缺少一種顯而易見的、更簡單的方法來實現我想要的?分片是否只是進行此類事情的預期方式,我應該接受這種復雜性嗎?代碼為了清楚起見,這是我正在運行的代碼片段,我只是迭代它返回的響應以生成輸出。def get_failures(job_id):
query = client.query(kind = Notification.kind)
query.add_filter('job_id', '=', str(job_id))
query.add_filter('status', '=', "failure")
return query.fetch()
1 回答

LEATH
TA貢獻1936條經驗 獲得超7個贊
這個問題的強大解決方案是使用 Google Dataflow。我目前使用它來完成此操作,在 Google Cloud Storage 中生成 csv 文件,其中包含與給定數據存儲查詢匹配的所有約 500k 條記錄。
不過,設置它可能有點復雜。
在開始之前,我使用了 10 分鐘超時而不是 30 秒超時的 Google 任務隊列。我不確定您是否可以純粹在云函數中執行此操作,或者您是否需要創建一個簡單的應用引擎項目來充當這些任務的請求處理程序
添加回答
舉報
0/150
提交
取消