1 回答
TA贡献1765条经验 获得超5个赞
这看起来 PartitionedDataSet 或 IncrementalDataSet 可能对您有用。
它们允许您将相似的数据分成单独的块,由文件确定,并在您认为合适的情况下对这些块重复操作。
因此,与其启动包含 y 个节点的 x 个管道,不如让一个包含 y 个节点的管道处理 x 个数据块。
有关此视频中 IncrementalDataSet 的更多信息:https ://www.youtube.com/watch?v=v7JSSiYgqpg
# nodes.py
from typing import Any, Dict, Callable
def _dict_mapper(dictionary: Dict[str, Any], fun: Callable):
# Apply your function to the dictionary mapping
return {k: fun(v) for k, v in dictionary.items()}
def node_0(list_of_strings: Dict[str, str]):
return _dict_mapper(list_of_strings, lambda x: int(x))
def node_1(list_of_numbers: Dict[str, int]):
return _dict_mapper(list_of_numbers, lambda x: x+1)
def node_2(list_of_numbers: Dict[str, int]):
return _dict_mapper(list_of_numbers, lambda x: str(x))
def node_3(list_of_strings: Dict[str, str]):
return _dict_mapper(list_of_strings, lambda x: f'{x}_suffix')
# catalog.yml
data:
type: IncrementalDataSet
dataset: text.TextDataSet
path: folder/with/text_files/each/containing/single/number/
filename_suffix: .txt
# pipeline.py
Pipeline([
node(node_0, inputs='data', outputs='0'),
node(node_1, inputs='0', outputs='1'),
node(node_2, inputs='1', outputs='2'),
node(node_3, inputs='2', outputs='final_output'),
])
添加回答
举报