1 回答

TA貢獻1765條經驗 獲得超5個贊
這看起來 PartitionedDataSet 或 IncrementalDataSet 可能對您有用。
它們允許您將相似的數據分成單獨的塊,由文件確定,并在您認為合適的情況下對這些塊重復操作。
因此,與其啟動包含 y 個節點的 x 個管道,不如讓一個包含 y 個節點的管道處理 x 個數據塊。
有關此視頻中 IncrementalDataSet 的更多信息:https ://www.youtube.com/watch?v=v7JSSiYgqpg
# nodes.py
from typing import Any, Dict, Callable
def _dict_mapper(dictionary: Dict[str, Any], fun: Callable):
# Apply your function to the dictionary mapping
return {k: fun(v) for k, v in dictionary.items()}
def node_0(list_of_strings: Dict[str, str]):
return _dict_mapper(list_of_strings, lambda x: int(x))
def node_1(list_of_numbers: Dict[str, int]):
return _dict_mapper(list_of_numbers, lambda x: x+1)
def node_2(list_of_numbers: Dict[str, int]):
return _dict_mapper(list_of_numbers, lambda x: str(x))
def node_3(list_of_strings: Dict[str, str]):
return _dict_mapper(list_of_strings, lambda x: f'{x}_suffix')
# catalog.yml
data:
type: IncrementalDataSet
dataset: text.TextDataSet
path: folder/with/text_files/each/containing/single/number/
filename_suffix: .txt
# pipeline.py
Pipeline([
node(node_0, inputs='data', outputs='0'),
node(node_1, inputs='0', outputs='1'),
node(node_2, inputs='1', outputs='2'),
node(node_3, inputs='2', outputs='final_output'),
])
添加回答
舉報