为了账号安全,请及时绑定邮箱和手机立即绑定

基于数据集分区/迭代器逻辑的流水线执行的动态实例

基于数据集分区/迭代器逻辑的流水线执行的动态实例

心有法竹 2023-01-04 16:23:43

不确定这是否可能,但这是我正在尝试做的:-


我想将函数的部分(步骤)提取为单个节点(到目前为止还可以),但要注意的是我在步骤之上有一个迭代器,它依赖于数据集上的某些逻辑,即重复相同的操作(这是独立)在数据集的逻辑分区上。


示例代码

def single_node(list_of_numbers):

   modified_list = [] # to store all output

   for x in list_of_numbers: # iteration logic

      x+=1 # Step 1

      x=str(x) # Step 2

      x+="_suffix" # Step 3

      modified_list.append(x) # append to final output

   return modified_list # return

语境

  1. 在提供的示例中,假设当前我有一个执行所有步骤的节点。

  2. 因此,当前管道有一个节点接受 1 个输入并返回 1 个输出。

  3. 随着我的步骤的复杂性增加,我想将它们作为单独的节点公开。所以我创建了另一个管道,将这 3 个步骤作为单独的节点并将它们连接在一起。(他们的输入和输出)

  4. 但是我的总体要求没有改变,我想遍历 中的所有值list_of_numbers,并且对于这个列表中的每个元素我想调用这个新管道。最后我想合并所有运行的输出并生成一个输出。

看起来有点类似于基于数据集扩展的动态图(管道的多个动态实例)。

需要考虑的其他要点,

  1. 我的输入是单个文件。假设我根据一些定义为节点的逻辑对数据集进行分区。所以这个节点可以有多个输出。(确切的计数完全取决于数据集,这里是列表的大小)

  2. 对于数据迭代器节点的每个输出,我需要“生成”一个管道。

  3. 最后,合并所有“衍生”管道的输出。(此逻辑可以再次在具有多个动态输入的合并节点中定义)。

有没有办法做到这一点?谢谢!


查看完整描述

1 回答

?
POPMUISE

TA贡献1511条经验 获得超5个赞

这看起来 PartitionedDataSet 或 IncrementalDataSet 可能对您有用。

它们允许您将相似的数据分成单独的块,由文件确定,并在您认为合适的情况下对这些块重复操作。

因此,与其启动包含 y 个节点的 x 个管道,不如让一个包含 y 个节点的管道处理 x 个数据块。

有关此视频中 IncrementalDataSet 的更多信息:https ://www.youtube.com/watch?v=v7JSSiYgqpg

# nodes.py


from typing import Any, Dict, Callable


def _dict_mapper(dictionary: Dict[str, Any], fun: Callable):

  # Apply your function to the dictionary mapping

  return {k: fun(v) for k, v in dictionary.items()}


def node_0(list_of_strings: Dict[str, str]):

  return _dict_mapper(list_of_strings, lambda x: int(x))


def node_1(list_of_numbers: Dict[str, int]):

  return _dict_mapper(list_of_numbers, lambda x: x+1)


def node_2(list_of_numbers: Dict[str, int]):

  return _dict_mapper(list_of_numbers, lambda x: str(x))


def node_3(list_of_strings: Dict[str, str]):

  return _dict_mapper(list_of_strings, lambda x: f'{x}_suffix')



# catalog.yml

data:

  type: IncrementalDataSet

  dataset: text.TextDataSet

  path: folder/with/text_files/each/containing/single/number/

  filename_suffix: .txt


# pipeline.py


Pipeline([

  node(node_0, inputs='data', outputs='0'),

  node(node_1, inputs='0', outputs='1'),

  node(node_2, inputs='1', outputs='2'),

  node(node_3, inputs='2', outputs='final_output'),

])


查看完整回答
反对 回复 2023-01-04

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信