亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

基于數據集分區/迭代器邏輯的流水線執行的動態實例

基于數據集分區/迭代器邏輯的流水線執行的動態實例

心有法竹 2023-01-04 16:23:43
不確定這是否可能,但這是我正在嘗試做的:-我想將函數的部分(步驟)提取為單個節點(到目前為止還可以),但要注意的是我在步驟之上有一個迭代器,它依賴于數據集上的某些邏輯,即重復相同的操作(這是獨立)在數據集的邏輯分區上。示例代碼def single_node(list_of_numbers):   modified_list = [] # to store all output   for x in list_of_numbers: # iteration logic      x+=1 # Step 1      x=str(x) # Step 2      x+="_suffix" # Step 3      modified_list.append(x) # append to final output   return modified_list # return語境在提供的示例中,假設當前我有一個執行所有步驟的節點。因此,當前管道有一個節點接受 1 個輸入并返回 1 個輸出。隨著我的步驟的復雜性增加,我想將它們作為單獨的節點公開。所以我創建了另一個管道,將這 3 個步驟作為單獨的節點并將它們連接在一起。(他們的輸入和輸出)但是我的總體要求沒有改變,我想遍歷 中的所有值list_of_numbers,并且對于這個列表中的每個元素我想調用這個新管道。最后我想合并所有運行的輸出并生成一個輸出??雌饋碛悬c類似于基于數據集擴展的動態圖(管道的多個動態實例)。需要考慮的其他要點,我的輸入是單個文件。假設我根據一些定義為節點的邏輯對數據集進行分區。所以這個節點可以有多個輸出。(確切的計數完全取決于數據集,這里是列表的大?。τ跀祿鞴濣c的每個輸出,我需要“生成”一個管道。最后,合并所有“衍生”管道的輸出。(此邏輯可以再次在具有多個動態輸入的合并節點中定義)。有沒有辦法做到這一點?謝謝!
查看完整描述

1 回答

?
POPMUISE

TA貢獻1765條經驗 獲得超5個贊

這看起來 PartitionedDataSet 或 IncrementalDataSet 可能對您有用。

它們允許您將相似的數據分成單獨的塊,由文件確定,并在您認為合適的情況下對這些塊重復操作。

因此,與其啟動包含 y 個節點的 x 個管道,不如讓一個包含 y 個節點的管道處理 x 個數據塊。

有關此視頻中 IncrementalDataSet 的更多信息:https ://www.youtube.com/watch?v=v7JSSiYgqpg

# nodes.py


from typing import Any, Dict, Callable


def _dict_mapper(dictionary: Dict[str, Any], fun: Callable):

  # Apply your function to the dictionary mapping

  return {k: fun(v) for k, v in dictionary.items()}


def node_0(list_of_strings: Dict[str, str]):

  return _dict_mapper(list_of_strings, lambda x: int(x))


def node_1(list_of_numbers: Dict[str, int]):

  return _dict_mapper(list_of_numbers, lambda x: x+1)


def node_2(list_of_numbers: Dict[str, int]):

  return _dict_mapper(list_of_numbers, lambda x: str(x))


def node_3(list_of_strings: Dict[str, str]):

  return _dict_mapper(list_of_strings, lambda x: f'{x}_suffix')



# catalog.yml

data:

  type: IncrementalDataSet

  dataset: text.TextDataSet

  path: folder/with/text_files/each/containing/single/number/

  filename_suffix: .txt


# pipeline.py


Pipeline([

  node(node_0, inputs='data', outputs='0'),

  node(node_1, inputs='0', outputs='1'),

  node(node_2, inputs='1', outputs='2'),

  node(node_3, inputs='2', outputs='final_output'),

])


查看完整回答
反對 回復 2023-01-04
  • 1 回答
  • 0 關注
  • 140 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號