我正在嘗試創建一個需要在多個任務之間傳遞數據的Flyte工作流程。我查看了文檔中的示例之一,但嘗試盡可能少地重新創建 blob 傳遞,但我仍然無法讓它工作。這是我的完整工作流程定義(當然,我的實際用例會產生更多數據):from flytekit.sdk.tasks import python_task, outputs, inputsfrom flytekit.sdk.types import Typesfrom flytekit.sdk.workflow import workflow_class, Output, Input@inputs(the_text=Types.String)@outputs(the_blob=Types.Blob)@python_taskdef create_blob(wf_params, the_text, the_blob): fname = "a-file.txt" with open(fname, "w") as f: f.write(the_text) the_blob.set(fname)@inputs(the_blob=Types.Blob)@outputs(the_text=Types.String)@python_taskdef read_blob(wf_params, the_blob, the_text): the_blob.download() with open(the_blob.local_path) as f: the_text.set(f.read())@workflow_classclass PassBlob: input_text = Input(Types.String, required=True, help="The text to write to the file") create = create_blob(the_text=input_text) read = read_blob(the_blob=create.outputs.the_blob) output_text = Output(read.outputs.the_text, sdk_type=Types.String, help="The text read from the file")此工作流部署成功,當我運行它時,會發生以下情況:該create任務成功運行,并說明其輸出:the_blob: type: single uri: /4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6在任務之間傳遞 s 的正確方法是什么Types.Blob?我該如何進行這項工作?
在 Flyte 中的任務之間傳遞 blob
慕婉清6462132
2023-12-20 16:16:43