为了账号安全,请及时绑定邮箱和手机立即绑定

Dask - 是否可以通过自定义函数使用每个工作线程中的所有线程?

Dask - 是否可以通过自定义函数使用每个工作线程中的所有线程?

开心每一天1111 2023-06-27 18:11:17
就我而言,我在 S3 中有多个文件和一个自定义函数,该函数读取每个文件并使用所有线程处理它。为了简化示例,我只生成一个数据帧df,并且假设我的函数是tsfresh.extract_features使用多重处理。生成数据import pandas as pdfrom tsfresh import extract_featuresfrom tsfresh.examples.robot_execution_failures import download_robot_execution_failures, \load_robot_execution_failuresdownload_robot_execution_failures()ts, y = load_robot_execution_failures()df = []for i in range(5):    tts = ts.copy()    tts["id"] += 88 * i    df.append(tts)    df = pd.concat(df, ignore_index=True)功能def fun(df, n_jobs):    extracted_features = extract_features(df,                                      column_id="id",                                      column_sort="time",                                      n_jobs=n_jobs)簇import daskfrom dask.distributed import Client, progressfrom dask import compute, delayedfrom dask_cloudprovider import FargateClustermy_vpc = # your vpcmy_subnets = # your subnetscpu = 2 ram = 4cluster = FargateCluster(n_workers=1,                         image='rpanai/feats-worker:2020-08-24',                         vpc=my_vpc,                         subnets=my_subnets,                         worker_cpu=int(cpu * 1024),                         worker_mem=int(ram * 1024),                         cloudwatch_logs_group="my_log_group",                         task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess'],                         scheduler_timeout='20 minutes'                        )cluster.adapt(minimum=1,              maximum=4)client = Client(cluster)client使用所有工作线程(失败)to_process = [delayed(fun)(df, cpu) for i in range(10)]out = compute(to_process)AssertionError: daemonic processes are not allowed to have children仅使用一个线程(OK)在这种情况下,它工作正常,但我浪费资源。to_process = [delayed(fun)(df, 0) for i in range(10)]out = compute(to_process)问题我知道对于这个特定的功能,我最终可以使用多线程和其他一些技巧编写一个自定义分配器,但我想分配一个工作,让每个工作人员都可以利用所有资源,而不必担心太多。
查看完整描述

1 回答

?
元芳怎么了

TA贡献1798条经验 获得超7个赞

我可以帮助回答您的具体问题tsfresh,但 iftsfresh只是一个简单的玩具示例,可能不是您想要的。

对于tsfresh,您通常不会混合使用tsfreshdask 和 dask 的多重处理,而是让 dask 执行所有处理。这意味着,您从一个单一的开始dask.DataFrame(在您的测试用例中,您可以将 pandas 数据帧转换为 dask 数据帧 - 对于您的读取用例,您可以直接从S3 docu读取),然后在 dask 数据帧中分发特征提取(特征提取的好处是,它在每个时间序列上独立工作。因此我们可以为每个时间序列生成一个作业)。


我不确定这是否有助于解决您更普遍的问题。在我看来,你(在大多数情况下)不想混合dask的分布函数和“本地”多核计算,而只是让dask处理一切。因为如果您位于 dask 集群上,您甚至可能不知道每台机器上有多少个核心(或者每个作业可能只获得一个核心)。

这意味着,如果您的作业可以分发 N 次,并且每个作业将启动 M 个子作业,您只需将“N x M”作业交给 dask 并让它计算其余部分(包括数据局部性)。


查看完整回答
反对 回复 2023-06-27
  • 1 回答
  • 0 关注
  • 71 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信