3 回答

TA貢獻1786條經驗 獲得超11個贊
您遇到的是關于閉包、循環和 Python 作用域的令人驚訝的問題。您可以通過分配變量而不是將其從閉包中拉出來解決這個問題。例如
tables = [??
? ? (
? ? ? ? data
? ? ? ? # Pass it as a side input to Filter.
? ? ? ? | "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)
? ? ? ? | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
? ? )
? ? for c in CLASSES
]
或者
tables = [??
? ? (
? ? ? ? data
? ? ? ? # Explicitly capture it as a default value in the lambda.
? ? ? ? | "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])
? ? ? ? | "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
? ? )
? ? for c in CLASSES
]
分區在這里也很有效,既可以避免這個陷阱,也可以表達你的意圖。

TA貢獻2021條經驗 獲得超8個贊
根據您顯示的結果表,我假設您希望輸出如下所示:
{'itemid': '1', 'value B': 10.3, 'value A': 0.2}
{'itemid': '2', 'value B': 0.2, 'value A': 3.0}
{'itemid': '3', 'value B': 1.2, 'value A': 0.0, 'value C': 5.4}
您的 mergeDicts 正在覆蓋值,因為字典每個鍵只能保存一個值。將 mergeDicts 更新為類似這樣的內容以指定鍵:
class mergeDicts(beam.DoFn):
process(self, elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result

TA貢獻1807條經驗 獲得超9個贊
我在這里發布一個我自己找到的解決方案,但我沒有檢查它是否為正確答案,因為我想更好地理解 Beam 引擎的執行邏輯。
為了根據條件獲得單獨的 pcollection,我沒有使用循環中的項目過濾表,而是使用了類beam.Partition
。通過直接應用文檔中的代碼示例,我將 pcollection 分成多個表,準備加入。
這樣就避免了這個問題,但是我不清楚為什么 for 循環不能像我預期的那樣工作。
添加回答
舉報