亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

使用循環分支 Apache Beam 管道

使用循環分支 Apache Beam 管道

喵喔喔 2023-06-20 17:27:24
我正在嘗試執行去規范化操作,我需要使用以下邏輯重新組織表:| itemid | class | value |+--------+-------+-------+| 1      | A     | 0.2   |       | itemid | value A | value B | value C || 1      | B     | 10.3  |  ==>  +--------+---------+---------+---------+| 2      | A     | 3.0   |  ==>  | 1      |   0.2   |  10.3   |         || 2      | B     | 0.2   |  ==>  | 2      |   3.0   |   0.2   |         || 3      | A     | 0.0   |       | 3      |   0.0   |   1.2   |   5.4   | | 3      | B     | 1.2   |  | 3      | C     | 5.4   |      我的方法是執行一個 for 循環以按 進行過濾class,前提是我知道先驗類列表,然后加入生成的 pcollection。高級代碼:CLASSES = ["A", "B", "C"]tables = [      (        data        | "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)        | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))    )    for cin CLASSES]和加入:_ = (     tables    | "Flatten" >> beam.Flatten()    | "Join Collections" >> beam.GroupByKey()    | "Remove key" >> beam.MapTuple(lambda _, val: val)    | "Merge dicts" >> beam.ParDo(mergeDicts())    | "Write to GCS" >> beam.io.WriteToText(output_file))與(根據 Peter Kim 的建議進行編輯):class mergeDicts(beam.DoFn):    process(self, elements):        result = {}        for dictionary in elements:            if len(dictionary)>0:                result["itemid"] = dictionary["itemid"]                result["value {}".format(dictionary["class"])] = dictionary["value"]        yield result我這里的問題是,當管道在 Apache Beam 計算引擎中執行時,我獲得了由列表的最后一個元素過濾的相同 pcollections,在本例中是 C。[已添加] 看起來 Apache Beam 引擎采用最終狀態的迭代變量,這意味著迭代列表的最后一個元素,用于所有調用的分支。我顯然采用了錯誤的方法,但是哪種方法應該是執行此操作的最佳方法?
查看完整描述

3 回答

?
Qyouu

TA貢獻1786條經驗 獲得超11個贊

您遇到的是關于閉包、循環和 Python 作用域的令人驚訝的問題。您可以通過分配變量而不是將其從閉包中拉出來解決這個問題。例如

tables = [??

? ? (

? ? ? ? data

? ? ? ? # Pass it as a side input to Filter.

? ? ? ? | "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)

? ? ? ? | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))

? ? )

? ? for c in CLASSES

]

或者


tables = [??

? ? (

? ? ? ? data

? ? ? ? # Explicitly capture it as a default value in the lambda.

? ? ? ? | "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])

? ? ? ? | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))

? ? )

? ? for c in CLASSES

]

分區在這里也很有效,既可以避免這個陷阱,也可以表達你的意圖。


查看完整回答
反對 回復 2023-06-20
?
寶慕林4294392

TA貢獻2021條經驗 獲得超8個贊

根據您顯示的結果表,我假設您希望輸出如下所示:


{'itemid': '1', 'value B': 10.3, 'value A': 0.2}

{'itemid': '2', 'value B': 0.2, 'value A': 3.0}

{'itemid': '3', 'value B': 1.2, 'value A': 0.0, 'value C': 5.4}

您的 mergeDicts 正在覆蓋值,因為字典每個鍵只能保存一個值。將 mergeDicts 更新為類似這樣的內容以指定鍵:


class mergeDicts(beam.DoFn):

    process(self, elements):

        result = {}

        for dictionary in elements:

            if len(dictionary)>0:

                result["itemid"] = dictionary["itemid"]

                result["value {}".format(dictionary["class"])] = dictionary["value"]

        yield result


查看完整回答
反對 回復 2023-06-20
?
函數式編程

TA貢獻1807條經驗 獲得超9個贊

我在這里發布一個我自己找到的解決方案,但我沒有檢查它是否為正確答案,因為我想更好地理解 Beam 引擎的執行邏輯。

為了根據條件獲得單獨的 pcollection,我沒有使用循環中的項目過濾表,而是使用了類beam.Partition。通過直接應用文檔中的代碼示例,我將 pcollection 分成多個表,準備加入。

這樣就避免了這個問題,但是我不清楚為什么 for 循環不能像我預期的那樣工作。


查看完整回答
反對 回復 2023-06-20
  • 3 回答
  • 0 關注
  • 215 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號